X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flfsck%2Flfsck_namespace.c;h=df57d18717e807962432056a14285763986e2f64;hp=14885dfada6859c5b6723875745448956f9a3447;hb=b52b52c2d142cec15ae35e91f878d1063c094bc4;hpb=6712478e79588e73e28c7ccac3afc7ac2368a4f3 diff --git a/lustre/lfsck/lfsck_namespace.c b/lustre/lfsck/lfsck_namespace.c index 14885df..df57d18 100644 --- a/lustre/lfsck/lfsck_namespace.c +++ b/lustre/lfsck/lfsck_namespace.c @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2013, 2016, Intel Corporation. + * Copyright (c) 2013, 2017, Intel Corporation. */ /* * lustre/lfsck/lfsck_namespace.c @@ -87,7 +87,7 @@ static void lfsck_namespace_assistant_req_fini(const struct lu_env *env, struct lfsck_assistant_req *lar) { struct lfsck_namespace_req *lnr = - container_of0(lar, struct lfsck_namespace_req, lnr_lar); + container_of(lar, struct lfsck_namespace_req, lnr_lar); if (lnr->lnr_lmv != NULL) lfsck_lmv_put(env, lnr->lnr_lmv); @@ -166,6 +166,8 @@ static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst, dst->ln_time_latest_reset = le64_to_cpu(src->ln_time_latest_reset); dst->ln_linkea_overflow_cleared = le64_to_cpu(src->ln_linkea_overflow_cleared); + dst->ln_agent_entries_repaired = + le64_to_cpu(src->ln_agent_entries_repaired); } static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst, @@ -238,6 +240,8 @@ static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst, dst->ln_time_latest_reset = cpu_to_le64(src->ln_time_latest_reset); dst->ln_linkea_overflow_cleared = cpu_to_le64(src->ln_linkea_overflow_cleared); + dst->ln_agent_entries_repaired = + cpu_to_le64(src->ln_agent_entries_repaired); } static void lfsck_namespace_record_failure(const struct lu_env *env, @@ -308,7 +312,7 @@ static int lfsck_namespace_load_bitmap(const struct lu_env *env, } if (ns->ln_bitmap_size == 0) { - lad->lad_incomplete = 0; + clear_bit(LAD_INCOMPLETE, &lad->lad_flags); CFS_RESET_BITMAP(bitmap); RETURN(0); @@ -322,9 +326,9 @@ static int lfsck_namespace_load_bitmap(const struct lu_env *env, RETURN(rc >= 0 ? -EINVAL : rc); if (cfs_bitmap_check_empty(bitmap)) - lad->lad_incomplete = 0; + clear_bit(LAD_INCOMPLETE, &lad->lad_flags); else - lad->lad_incomplete = 1; + set_bit(LAD_INCOMPLETE, &lad->lad_flags); RETURN(0); } @@ -527,7 +531,7 @@ int lfsck_namespace_trace_update(const struct lu_env *env, GOTO(log, rc); } - th = dt_trans_create(env, dev); + th = lfsck_trans_create(env, dev, lfsck); if (IS_ERR(th)) GOTO(log, rc = PTR_ERR(th)); @@ -558,7 +562,7 @@ int lfsck_namespace_trace_update(const struct lu_env *env, if (new != 0) { rc = dt_insert(env, obj, (const struct dt_rec *)&new, - (const struct dt_key *)key, th, 1); + (const struct dt_key *)key, th); if (rc != 0) GOTO(log, rc); } @@ -592,8 +596,7 @@ int lfsck_namespace_check_exist(const struct lu_env *env, if (unlikely(lfsck_is_dead_obj(obj))) RETURN(LFSCK_NAMEENTRY_DEAD); - rc = dt_lookup(env, dir, (struct dt_rec *)fid, - (const struct dt_key *)name); + rc = dt_lookup_dir(env, dir, name, fid); if (rc == -ENOENT) RETURN(LFSCK_NAMEENTRY_REMOVED); @@ -692,7 +695,7 @@ static int lfsck_namespace_links_remove(const struct lu_env *env, LASSERT(dt_object_remote(obj) == 0); - th = dt_trans_create(env, dev); + th = lfsck_trans_create(env, dev, lfsck); if (IS_ERR(th)) GOTO(log, rc = PTR_ERR(th)); @@ -756,19 +759,41 @@ again: return rc; } +static inline bool linkea_reclen_is_valid(const struct linkea_data *ldata) +{ + if (ldata->ld_reclen <= 0) + return false; + + if ((char *)ldata->ld_lee + ldata->ld_reclen > + (char *)ldata->ld_leh + ldata->ld_leh->leh_len) + return false; + + return true; +} + +static inline bool linkea_entry_is_valid(const struct linkea_data *ldata, + const struct lu_name *cname, + const struct lu_fid *pfid) +{ + if (!linkea_reclen_is_valid(ldata)) + return false; + + if (cname->ln_namelen <= 0 || cname->ln_namelen > NAME_MAX) + return false; + + if (!fid_is_sane(pfid)) + return false; + + return true; +} + static int lfsck_namespace_unpack_linkea_entry(struct linkea_data *ldata, struct lu_name *cname, struct lu_fid *pfid, char *buf, const int buflen) { linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid); - if (unlikely(ldata->ld_reclen <= 0 || - ldata->ld_reclen + sizeof(struct link_ea_header) > - ldata->ld_leh->leh_len || - cname->ln_namelen <= 0 || - cname->ln_namelen > NAME_MAX || - cname->ln_namelen >= buflen || - !fid_is_sane(pfid))) + if (unlikely(!linkea_entry_is_valid(ldata, cname, pfid))) return -EINVAL; /* To guarantee the 'name' is terminated with '0'. */ @@ -786,9 +811,7 @@ static void lfsck_linkea_del_buf(struct linkea_data *ldata, /* If current record is corrupted, all the subsequent * records will be dropped. */ - if (unlikely(ldata->ld_reclen <= 0 || - ldata->ld_reclen + sizeof(struct link_ea_header) > - ldata->ld_leh->leh_len)) { + if (unlikely(!linkea_reclen_is_valid(ldata))) { void *ptr = ldata->ld_lee; ldata->ld_leh->leh_len = sizeof(struct link_ea_header); @@ -807,7 +830,7 @@ static void lfsck_linkea_del_buf(struct linkea_data *ldata, ldata->ld_lee = NULL; } else { - linkea_del_buf(ldata, lname); + linkea_del_buf(ldata, lname, false); } } @@ -826,7 +849,10 @@ static int lfsck_namespace_filter_linkea_entry(struct linkea_data *ldata, while (ldata->ld_lee != NULL) { ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) | ldata->ld_lee->lee_reclen[1]; - if (unlikely(ldata->ld_reclen == oldlen && + if (unlikely(!linkea_reclen_is_valid(ldata))) { + lfsck_linkea_del_buf(ldata, NULL); + LASSERT(ldata->ld_lee == NULL); + } else if (unlikely(ldata->ld_reclen == oldlen && memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) { repeated++; if (!remove) @@ -919,8 +945,7 @@ again: do { namelen = snprintf(info->lti_key, NAME_MAX, DFID"%s-%s-%d", PFID(cfid), infix, type, idx++); - rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, - (const struct dt_key *)info->lti_key); + rc = dt_lookup_dir(env, parent, info->lti_key, &tfid); if (rc != 0 && rc != -ENOENT) GOTO(log, rc); @@ -935,8 +960,7 @@ again: /* Re-check whether the name conflict with othrs after taken * the ldlm lock. */ - rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, - (const struct dt_key *)info->lti_key); + rc = dt_lookup_dir(env, parent, info->lti_key, &tfid); if (rc == 0) { if (!lu_fid_eq(cfid, &tfid)) { exist = false; @@ -966,7 +990,7 @@ again: lfsck_buf_init(&linkea_buf, ldata2.ld_buf->lb_buf, ldata2.ld_leh->leh_len); - th = dt_trans_create(env, dev); + th = lfsck_trans_create(env, dev, lfsck); if (IS_ERR(th)) GOTO(log, rc = PTR_ERR(th)); @@ -1031,7 +1055,7 @@ again: rec->rec_type = S_IFDIR; rec->rec_fid = pfid; rc = dt_insert(env, orphan, (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th, 1); + (const struct dt_key *)dotdot, th); if (rc != 0) GOTO(unlock, rc); } @@ -1050,7 +1074,7 @@ again: rec->rec_type = lfsck_object_type(orphan) & S_IFMT; rec->rec_fid = cfid; rc = dt_insert(env, parent, (const struct dt_rec *)rec, - (const struct dt_key *)cname->ln_name, th, 1); + (const struct dt_key *)cname->ln_name, th); if (rc == 0 && S_ISDIR(rec->rec_type)) { dt_write_lock(env, parent, 0); rc = dt_ref_add(env, parent, th); @@ -1086,6 +1110,99 @@ log: return rc; } +static int lfsck_lmv_set(const struct lu_env *env, + struct lfsck_instance *lfsck, + struct dt_object *obj, + struct lmv_mds_md_v1 *lmv) +{ + struct dt_device *dev = lfsck->li_next; + struct thandle *th = NULL; + struct lu_buf buf = { lmv, sizeof(*lmv) }; + int rc; + + ENTRY; + + th = lfsck_trans_create(env, dev, lfsck); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + + rc = dt_declare_xattr_set(env, obj, &buf, XATTR_NAME_LMV, 0, th); + if (rc) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, dev, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_xattr_set(env, obj, &buf, XATTR_NAME_LMV, 0, th); + if (rc) + GOTO(stop, rc); + + EXIT; +stop: + dt_trans_stop(env, dev, th); + + return rc; +} + +static int lfsck_lmv_delete(const struct lu_env *env, + struct lfsck_instance *lfsck, + struct dt_object *obj) +{ + struct dt_device *dev = lfsck->li_next; + struct thandle *th = NULL; + int rc; + + ENTRY; + + th = lfsck_trans_create(env, dev, lfsck); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + + rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LMV, th); + if (rc) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, dev, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_xattr_del(env, obj, XATTR_NAME_LMV, th); + if (rc) + GOTO(stop, rc); + + EXIT; +stop: + dt_trans_stop(env, dev, th); + + return rc; +} + +static inline int lfsck_object_is_shard(const struct lu_env *env, + struct lfsck_instance *lfsck, + struct dt_object *obj, + const struct lu_name *lname) +{ + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lmv_mds_md_v1 *lmv = &info->lti_lmv; + int rc; + + rc = lfsck_shard_name_to_index(env, lname->ln_name, lname->ln_namelen, + lfsck_object_type(obj), + lfsck_dto2fid(obj)); + if (rc < 0) + return 0; + + rc = lfsck_read_stripe_lmv(env, lfsck, obj, lmv); + if (rc == -ENODATA) + return 0; + + if (!rc && lmv->lmv_magic == LMV_MAGIC_STRIPE) + return 1; + + return rc; +} + /** * Add the specified name entry back to namespace. * @@ -1096,13 +1213,17 @@ log: * it is quite possible that the name entry is lost. Then the LFSCK * should add the name entry back to the namespace. * + * If \a child is shard, which means \a parent is a striped directory, + * if \a parent has LMV, we need to delete it before insertion because + * now parent's striping is broken and can't be parsed correctly. + * * \param[in] env pointer to the thread context * \param[in] com pointer to the lfsck component * \param[in] parent pointer to the directory under which the name entry * will be inserted into * \param[in] child pointer to the object referenced by the name entry * that to be inserted into the parent - * \param[in] name the name for the child in the parent directory + * \param[in] lname the name for the child in the parent directory * * \retval positive number for repaired cases * \retval 0 if nothing to be repaired @@ -1112,19 +1233,26 @@ static int lfsck_namespace_insert_normal(const struct lu_env *env, struct lfsck_component *com, struct dt_object *parent, struct dt_object *child, - const char *name) + const struct lu_name *lname) { - struct lfsck_thread_info *info = lfsck_env_info(env); - struct lu_attr *la = &info->lti_la; - struct dt_insert_rec *rec = &info->lti_dt_rec; - struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lu_attr *la = &info->lti_la; + struct dt_insert_rec *rec = &info->lti_dt_rec; + struct lfsck_instance *lfsck = com->lc_lfsck; /* The child and its name may be on different MDTs. */ - const struct lu_fid *pfid = lfsck_dto2fid(parent); - const struct lu_fid *cfid = lfsck_dto2fid(child); - struct dt_device *dev = lfsck->li_next; - struct thandle *th = NULL; - struct lfsck_lock_handle *llh = &info->lti_llh; - int rc = 0; + const struct lu_fid *pfid = lfsck_dto2fid(parent); + const struct lu_fid *cfid = lfsck_dto2fid(child); + struct dt_device *dev = lfsck->li_next; + struct thandle *th = NULL; + struct lfsck_lock_handle *llh = &info->lti_llh; + struct lmv_mds_md_v1 *lmv = &info->lti_lmv; + struct lu_buf buf = { lmv, sizeof(*lmv) }; + /* whether parent's LMV is deleted before insertion */ + bool parent_lmv_deleted = false; + /* whether parent's LMV is missing */ + bool parent_lmv_lost = false; + int rc = 0; + ENTRY; /* @parent/@child may be based on lfsck->li_bottom, @@ -1134,9 +1262,6 @@ static int lfsck_namespace_insert_normal(const struct lu_env *env, if (IS_ERR(parent)) GOTO(log, rc = PTR_ERR(parent)); - if (unlikely(!dt_try_as_dir(env, parent))) - GOTO(log, rc = -ENOTDIR); - child = lfsck_object_locate(dev, child); if (IS_ERR(child)) GOTO(log, rc = PTR_ERR(child)); @@ -1144,19 +1269,65 @@ static int lfsck_namespace_insert_normal(const struct lu_env *env, if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) GOTO(log, rc = 1); - rc = lfsck_lock(env, lfsck, parent, name, llh, - MDS_INODELOCK_UPDATE, LCK_PW); - if (rc != 0) + rc = lfsck_lock(env, lfsck, parent, lname->ln_name, llh, + MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE | + MDS_INODELOCK_XATTR, LCK_EX); + if (rc) GOTO(log, rc); - th = dt_trans_create(env, dev); + rc = lfsck_object_is_shard(env, lfsck, child, lname); + if (rc < 0) + GOTO(unlock, rc); + + if (rc == 1) { + rc = lfsck_read_stripe_lmv(env, lfsck, parent, lmv); + if (!rc) { + /* + * To add a shard, we need to convert parent to a + * plain directory by deleting its LMV, and after + * insertion set it back. + */ + rc = lfsck_lmv_delete(env, lfsck, parent); + if (rc) + GOTO(unlock, rc); + parent_lmv_deleted = true; + lmv->lmv_layout_version++; + lfsck_lmv_header_cpu_to_le(lmv, lmv); + } else if (rc == -ENODATA) { + struct lu_seq_range *range = &info->lti_range; + struct seq_server_site *ss = lfsck_dev_site(lfsck); + + rc = lfsck_read_stripe_lmv(env, lfsck, child, lmv); + if (rc) + GOTO(unlock, rc); + + fld_range_set_mdt(range); + rc = fld_server_lookup(env, ss->ss_server_fld, + fid_seq(lfsck_dto2fid(parent)), range); + if (rc) + GOTO(unlock, rc); + + parent_lmv_lost = true; + lmv->lmv_magic = LMV_MAGIC; + lmv->lmv_master_mdt_index = range->lsr_index; + lmv->lmv_layout_version++; + lfsck_lmv_header_cpu_to_le(lmv, lmv); + } else { + GOTO(unlock, rc); + } + } + + if (unlikely(!dt_try_as_dir(env, parent))) + GOTO(unlock, rc = -ENOTDIR); + + th = lfsck_trans_create(env, dev, lfsck); if (IS_ERR(th)) GOTO(unlock, rc = PTR_ERR(th)); rec->rec_type = lfsck_object_type(child) & S_IFMT; rec->rec_fid = cfid; rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec, - (const struct dt_key *)name, th); + (const struct dt_key *)lname->ln_name, th); if (rc != 0) GOTO(stop, rc); @@ -1166,7 +1337,13 @@ static int lfsck_namespace_insert_normal(const struct lu_env *env, GOTO(stop, rc); } - memset(la, 0, sizeof(*la)); + if (parent_lmv_lost) { + rc = dt_declare_xattr_set(env, parent, &buf, XATTR_NAME_LMV, + 0, th); + if (rc) + GOTO(stop, rc); + } + la->la_ctime = ktime_get_real_seconds(); la->la_valid = LA_CTIME; rc = dt_declare_attr_set(env, parent, la, th); @@ -1182,7 +1359,7 @@ static int lfsck_namespace_insert_normal(const struct lu_env *env, GOTO(stop, rc); rc = dt_insert(env, parent, (const struct dt_rec *)rec, - (const struct dt_key *)name, th, 1); + (const struct dt_key *)lname->ln_name, th); if (rc != 0) GOTO(stop, rc); @@ -1194,7 +1371,12 @@ static int lfsck_namespace_insert_normal(const struct lu_env *env, GOTO(stop, rc); } - la->la_ctime = ktime_get_real_seconds(); + if (parent_lmv_lost) { + rc = dt_xattr_set(env, parent, &buf, XATTR_NAME_LMV, 0, th); + if (rc) + GOTO(stop, rc); + } + rc = dt_attr_set(env, parent, la, th); if (rc != 0) GOTO(stop, rc); @@ -1207,12 +1389,15 @@ stop: dt_trans_stop(env, dev, th); unlock: + if (parent_lmv_deleted) + lfsck_lmv_set(env, lfsck, parent, lmv); + lfsck_unlock(llh); log: CDEBUG(D_LFSCK, "%s: namespace LFSCK insert object "DFID" with " "the name %s and type %o to the parent "DFID": rc = %d\n", - lfsck_lfsck2name(lfsck), PFID(cfid), name, + lfsck_lfsck2name(lfsck), PFID(cfid), lname->ln_name, lfsck_object_type(child) & S_IFMT, PFID(pfid), rc); if (rc != 0) { @@ -1279,16 +1464,15 @@ static int lfsck_namespace_create_orphan_dir(const struct lu_env *env, GOTO(log, rc = 1); if (dt_object_remote(orphan)) { - LASSERT(lfsck->li_lpf_root_obj != NULL); + if (lfsck->li_lpf_root_obj == NULL) + GOTO(log, rc = -EBADF); idx = lfsck_find_mdt_idx_by_fid(env, lfsck, cfid); if (idx < 0) GOTO(log, rc = idx); snprintf(name, 8, "MDT%04x", idx); - rc = dt_lookup(env, lfsck->li_lpf_root_obj, - (struct dt_rec *)&tfid, - (const struct dt_key *)name); + rc = dt_lookup_dir(env, lfsck->li_lpf_root_obj, name, &tfid); if (rc != 0) GOTO(log, rc = (rc == -ENOENT ? -ENXIO : rc)); @@ -1315,8 +1499,7 @@ again: do { namelen = snprintf(name, 31, DFID"-P-%d", PFID(cfid), idx++); - rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, - (const struct dt_key *)name); + rc = dt_lookup_dir(env, parent, name, &tfid); if (rc != 0 && rc != -ENOENT) GOTO(log, rc); } while (rc == 0); @@ -1328,8 +1511,7 @@ again: /* Re-check whether the name conflict with othrs after taken * the ldlm lock. */ - rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, - (const struct dt_key *)name); + rc = dt_lookup_dir(env, parent, name, &tfid); if (unlikely(rc == 0)) { lfsck_unlock(llh); goto again; @@ -1357,7 +1539,7 @@ again: if (rc != 0) GOTO(unlock1, rc); - th = dt_trans_create(env, dev); + th = lfsck_trans_create(env, dev, lfsck); if (IS_ERR(th)) GOTO(unlock1, rc = PTR_ERR(th)); @@ -1395,8 +1577,8 @@ again: lmv->lmv_master_mdt_index = lfsck_dev_idx(lfsck); lfsck_lmv_header_cpu_to_le(lmv2, lmv); lfsck_buf_init(&lmv_buf, lmv2, sizeof(*lmv2)); - rc = dt_declare_xattr_set(env, orphan, &lmv_buf, - XATTR_NAME_LMV, 0, th); + rc = dt_declare_xattr_set(env, orphan, &lmv_buf, XATTR_NAME_LMV, + 0, th); if (rc != 0) GOTO(stop, rc); } @@ -1432,13 +1614,13 @@ again: rec->rec_fid = cfid; rc = dt_insert(env, orphan, (const struct dt_rec *)rec, - (const struct dt_key *)dot, th, 1); + (const struct dt_key *)dot, th); if (rc != 0) GOTO(unlock2, rc); rec->rec_fid = lfsck_dto2fid(parent); rc = dt_insert(env, orphan, (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th, 1); + (const struct dt_key *)dotdot, th); if (rc != 0) GOTO(unlock2, rc); @@ -1456,7 +1638,7 @@ again: rec->rec_fid = cfid; rc = dt_insert(env, parent, (const struct dt_rec *)rec, - (const struct dt_key *)name, th, 1); + (const struct dt_key *)name, th); if (rc == 0) { dt_write_lock(env, parent, 0); rc = dt_ref_add(env, parent, th); @@ -1549,7 +1731,7 @@ static int lfsck_namespace_shrink_linkea(const struct lu_env *env, } again: - th = dt_trans_create(env, dev); + th = lfsck_trans_create(env, dev, lfsck); if (IS_ERR(th)) GOTO(unlock1, rc = PTR_ERR(th)); @@ -1587,16 +1769,22 @@ again: lfsck_namespace_filter_linkea_entry(&ldata_new, cname, pfid, true); - if (buflen < ldata_new.ld_leh->leh_len) { + /* + * linkea may change because it doesn't take lock in the first read, if + * it becomes larger, restart from beginning. + */ + if ((ldata_new.ld_leh->leh_reccount > 0 || + unlikely(ldata_new.ld_leh->leh_overflow_time)) && + buflen < ldata_new.ld_leh->leh_len) { dt_write_unlock(env, obj); dt_trans_stop(env, dev, th); lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf, ldata_new.ld_leh->leh_len); + buflen = linkea_buf.lb_len; goto again; } - if (ldata_new.ld_leh->leh_reccount > 0 || - unlikely(ldata->ld_leh->leh_overflow_time)) + if (buflen) rc = lfsck_links_write(env, obj, &ldata_new, th); else rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th); @@ -1795,8 +1983,7 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env, goto replace; } - rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, - (const struct dt_key *)name); + rc = dt_lookup_dir(env, parent, name, &tfid); if (rc == -ENOENT) { exist = false; goto replace; @@ -1859,7 +2046,7 @@ replace: if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) GOTO(log, rc = 1); - th = dt_trans_create(env, dev); + th = lfsck_trans_create(env, dev, lfsck); if (IS_ERR(th)) GOTO(log, rc = PTR_ERR(th)); @@ -1896,7 +2083,7 @@ replace: GOTO(stop, rc); rc = dt_insert(env, parent, (const struct dt_rec *)rec, - (const struct dt_key *)name, th, 1); + (const struct dt_key *)name, th); GOTO(stop, rc = (rc == 0 ? 1 : rc)); @@ -1945,7 +2132,7 @@ int lfsck_namespace_rebuild_linkea(const struct lu_env *env, int rc = 0; ENTRY; - th = dt_trans_create(env, dev); + th = lfsck_trans_create(env, dev, lfsck); if (IS_ERR(th)) GOTO(log, rc = PTR_ERR(th)); @@ -2023,7 +2210,7 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, struct lfsck_thread_info *info = lfsck_env_info(env); struct dt_insert_rec *rec = &info->lti_dt_rec; const struct lu_fid *pfid = lfsck_dto2fid(parent); - const struct lu_fid *cfid = lfsck_dto2fid(child); + struct lu_fid cfid = {0}; struct lu_fid tfid; struct lfsck_instance *lfsck = com->lc_lfsck; struct dt_device *dev = lfsck->li_next; @@ -2033,6 +2220,8 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, int rc = 0; ENTRY; + if (child) + cfid = *lfsck_dto2fid(child); parent = lfsck_object_locate(dev, parent); if (IS_ERR(parent)) GOTO(log, rc = PTR_ERR(parent)); @@ -2049,7 +2238,7 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, if (rc != 0) GOTO(log, rc); - th = dt_trans_create(env, dev); + th = lfsck_trans_create(env, dev, lfsck); if (IS_ERR(th)) GOTO(unlock1, rc = PTR_ERR(th)); @@ -2059,7 +2248,8 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, if (update) { rec->rec_type = lfsck_object_type(child) & S_IFMT; - rec->rec_fid = cfid; + LASSERT(!fid_is_zero(&cfid)); + rec->rec_fid = &cfid; rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec, (const struct dt_key *)name2, th); @@ -2079,8 +2269,7 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, dt_write_lock(env, parent, 0); - rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, - (const struct dt_key *)name); + rc = dt_lookup_dir(env, dt_object_child(parent), name, &tfid); /* Someone has removed the bad name entry by race. */ if (rc == -ENOENT) GOTO(unlock2, rc = 0); @@ -2090,7 +2279,7 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, /* Someone has removed the bad name entry and reused it for other * object by race. */ - if (!lu_fid_eq(&tfid, cfid)) + if (!lu_fid_eq(&tfid, &cfid)) GOTO(unlock2, rc = 0); if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) @@ -2103,7 +2292,7 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, if (update) { rc = dt_insert(env, parent, (const struct dt_rec *)rec, - (const struct dt_key *)name2, th, 1); + (const struct dt_key *)name2, th); if (rc != 0) GOTO(unlock2, rc); } @@ -2125,8 +2314,8 @@ stop: /* We are not sure whether the child will become orphan or not. * Record it in the LFSCK trace file for further checking in * the second-stage scanning. */ - if (!update && !dec && rc == 0) - lfsck_namespace_trace_update(env, com, cfid, + if (!update && !dec && child && rc == 0) + lfsck_namespace_trace_update(env, com, &cfid, LNTF_CHECK_LINKEA, true); unlock1: @@ -2139,7 +2328,7 @@ log: "entry for: parent "DFID", child "DFID", name %s, type " "in name entry %o, type claimed by child %o. repair it " "by %s with new name2 %s: rc = %d\n", - lfsck_lfsck2name(lfsck), PFID(pfid), PFID(cfid), + lfsck_lfsck2name(lfsck), PFID(pfid), PFID(&cfid), name, type, update ? lfsck_object_type(child) : 0, update ? "updating" : "removing", name2, rc); @@ -2196,7 +2385,7 @@ static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env, lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf, ldata.ld_leh->leh_len); - th = dt_trans_create(env, dev); + th = lfsck_trans_create(env, dev, lfsck); if (IS_ERR(th)) GOTO(log, rc = PTR_ERR(th)); @@ -2231,7 +2420,7 @@ static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env, dt_delete(env, obj, (const struct dt_key *)dotdot, th); rc = dt_insert(env, obj, (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th, 1); + (const struct dt_key *)dotdot, th); if (rc != 0) GOTO(unlock, rc); @@ -2411,7 +2600,7 @@ lfsck_namespace_dsd_single(const struct lu_env *env, lost_parent: lmv = &info->lti_lmv; - rc = lfsck_read_stripe_lmv(env, child, lmv); + rc = lfsck_read_stripe_lmv(env, lfsck, child, lmv); if (rc != 0 && rc != -ENODATA) GOTO(out, rc); @@ -2440,7 +2629,7 @@ lost_parent: if (rc >= 0) { /* Add the missing name entry to the parent. */ rc = lfsck_namespace_insert_normal(env, com, parent, - child, cname->ln_name); + child, cname); if (unlikely(rc == -EEXIST)) { /* Unfortunately, someone reused the name * under the parent by race. So we have @@ -2478,8 +2667,7 @@ lost_parent: GOTO(out, rc); } - rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, - (const struct dt_key *)cname->ln_name); + rc = dt_lookup_dir(env, parent, cname->ln_name, &tfid); if (rc == -ENOENT) { /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT * has ever tried to verify some remote MDT-object that resides @@ -2498,7 +2686,8 @@ lost_parent: } lfsck_ibits_unlock(lh, LCK_EX); - rc = lfsck_namespace_check_name(env, parent, child, cname); + rc = lfsck_namespace_check_name(env, lfsck, parent, child, + cname); if (rc == -ENOENT) goto lost_parent; @@ -2522,7 +2711,7 @@ lost_parent: /* Add the missing name entry back to the namespace. */ rc = lfsck_namespace_insert_normal(env, com, parent, child, - cname->ln_name); + cname); if (unlikely(rc == -ESTALE)) /* It may happen when the remote object has been * removed, but the local MDT is not aware of that. */ @@ -2694,8 +2883,20 @@ again: } parent = lfsck_object_find_bottom(env, lfsck, &tfid); - if (IS_ERR(parent)) - RETURN(PTR_ERR(parent)); + if (IS_ERR(parent)) { + rc = PTR_ERR(parent); + /* if @pfid doesn't have a valid OI mapping, it will + * trigger OI scrub, and -ENONET is is returned if it's + * remote, -EINPROGRESS if local. + */ + if ((rc == -ENOENT || rc == -EINPROGRESS) && + ldata->ld_leh->leh_reccount > 1) { + lfsck_linkea_del_buf(ldata, cname); + continue; + } + + RETURN(rc); + } if (!dt_object_exists(parent)) { lfsck_object_put(env, parent); @@ -2718,8 +2919,7 @@ again: continue; } - rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, - (const struct dt_key *)cname->ln_name); + rc = dt_lookup_dir(env, parent, cname->ln_name, &tfid); *pfid2 = *lfsck_dto2fid(parent); if (rc == -ENOENT) { lfsck_object_put(env, parent); @@ -2908,7 +3108,7 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env, if (rc != 0) GOTO(log, rc); - th = dt_trans_create(env, dev); + th = lfsck_trans_create(env, dev, lfsck); if (IS_ERR(th)) GOTO(log, rc = PTR_ERR(th)); @@ -3049,11 +3249,8 @@ static int lfsck_namespace_double_scan_dir(const struct lu_env *env, if (flags & (LNTF_CHECK_LINKEA | LNTF_CHECK_PARENT) && !(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) { - CDEBUG(D_LFSCK, "%s: some MDT(s) maybe NOT take part in the" - "the namespace LFSCK, then the LFSCK cannot guarantee" - "all the name entries have been verified in first-stage" - "scanning. So have to skip orphan related handling for" - "the directory object "DFID" with remote name entry\n", + CDEBUG(D_LFSCK, + "%s: some MDT(s) maybe NOT take part in the the namespace LFSCK, then the LFSCK cannot guarantee all the name entries have been verified in first-stage scanning. So have to skip orphan related handling for the directory object "DFID" with remote name entry\n", lfsck_lfsck2name(lfsck), PFID(cfid)); RETURN(0); @@ -3081,8 +3278,7 @@ lock: GOTO(out, rc = 0); } - rc = dt_lookup(env, child, (struct dt_rec *)pfid, - (const struct dt_key *)dotdot); + rc = dt_lookup_dir(env, child, dotdot, pfid); if (rc != 0) { if (rc != -ENOENT && rc != -ENODATA && rc != -EINVAL) { dt_read_unlock(env, child); @@ -3272,7 +3468,7 @@ static int lfsck_namespace_linkea_clear_overflow(const struct lu_env *env, if (rc != 0) GOTO(log, rc); - th = dt_trans_create(env, dev); + th = lfsck_trans_create(env, dev, lfsck); if (IS_ERR(th)) GOTO(log, rc = PTR_ERR(th)); @@ -3298,10 +3494,6 @@ static int lfsck_namespace_linkea_clear_overflow(const struct lu_env *env, if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) GOTO(unlock, rc = 1); - /* If all known entries are in the linkEA, then the 'leh_reccount' - * should NOT be zero. */ - LASSERT(ldata->ld_leh->leh_reccount > 0); - lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf, ldata->ld_leh->leh_len); rc = dt_xattr_set(env, obj, &linkea_buf, XATTR_NAME_LINK, 0, th); @@ -3328,6 +3520,146 @@ log: } /** + * Verify the object's agent entry. + * + * If the object claims to have agent entry but the linkEA does not contain + * remote parent, then remove the agent entry. Otherwise, if the object has + * no agent entry but its linkEA contains remote parent, then will generate + * agent entry for it. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * \param[in] obj pointer to the dt_object to be handled + * + * \retval positive number for repaired cases + * \retval 0 if nothing to be repaired + * \retval negative error number on failure + */ +static int lfsck_namespace_check_agent_entry(const struct lu_env *env, + struct lfsck_component *com, + struct dt_object *obj) +{ + struct linkea_data ldata = { NULL }; + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct lu_fid *pfid = &info->lti_fid2; + struct lu_name *cname = &info->lti_name; + struct lu_seq_range *range = &info->lti_range; + struct seq_server_site *ss = lfsck_dev_site(lfsck); + __u32 idx = lfsck_dev_idx(lfsck); + int rc; + bool remote = false; + ENTRY; + + if (!(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) + RETURN(0); + + rc = lfsck_links_read_with_rec(env, obj, &ldata); + if (rc == -ENOENT || rc == -ENODATA) + RETURN(0); + + if (rc && rc != -EINVAL) + GOTO(out, rc); + + /* We check the agent entry again after verifying the linkEA + * successfully. So invalid linkEA should be dryrun mode. */ + if (rc == -EINVAL || unlikely(!ldata.ld_leh->leh_reccount)) + RETURN(0); + + linkea_first_entry(&ldata); + while (ldata.ld_lee != NULL && !remote) { + linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, + cname, pfid); + if (!linkea_entry_is_valid(&ldata, cname, pfid)) + GOTO(out, rc = 0); + + fld_range_set_mdt(range); + rc = fld_server_lookup(env, ss->ss_server_fld, + fid_seq(pfid), range); + if (rc) + GOTO(out, rc = (rc == -ENOENT ? 0 : rc)); + + if (range->lsr_index != idx) + remote = true; + else + linkea_next_entry(&ldata); + } + + if ((lu_object_has_agent_entry(&obj->do_lu) && !remote) || + (!lu_object_has_agent_entry(&obj->do_lu) && remote)) { + struct dt_device *dev = lfsck_obj2dev(obj); + struct linkea_data ldata2 = { NULL }; + struct lustre_handle lh = { 0 }; + struct lu_buf linkea_buf; + struct thandle *handle; + + if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) + GOTO(out, rc = 1); + + rc = lfsck_ibits_lock(env, lfsck, obj, &lh, + MDS_INODELOCK_UPDATE | + MDS_INODELOCK_XATTR, LCK_EX); + if (rc) + GOTO(out, rc); + + handle = lfsck_trans_create(env, dev, lfsck); + if (IS_ERR(handle)) + GOTO(unlock, rc = PTR_ERR(handle)); + + lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf, + ldata.ld_leh->leh_len); + rc = dt_declare_xattr_set(env, obj, &linkea_buf, + XATTR_NAME_LINK, LU_XATTR_REPLACE, handle); + if (rc) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, dev, handle); + if (rc) + GOTO(stop, rc); + + dt_write_lock(env, obj, 0); + rc = lfsck_links_read2_with_rec(env, obj, &ldata2); + if (rc) { + if (rc == -ENOENT || rc == -ENODATA) + rc = 0; + GOTO(unlock2, rc); + } + + /* If someone changed linkEA by race, then the agent + * entry will be updated by lower layer automatically. */ + if (ldata.ld_leh->leh_len != ldata2.ld_leh->leh_len || + memcmp(ldata.ld_buf->lb_buf, ldata2.ld_buf->lb_buf, + ldata.ld_leh->leh_len) != 0) + GOTO(unlock2, rc = 0); + + rc = dt_xattr_set(env, obj, &linkea_buf, XATTR_NAME_LINK, + LU_XATTR_REPLACE, handle); + if (!rc) + rc = 1; + + GOTO(unlock2, rc); + +unlock2: + dt_write_unlock(env, obj); +stop: + dt_trans_stop(env, dev, handle); +unlock: + lfsck_ibits_unlock(&lh, LCK_EX); + } + + GOTO(out, rc); + +out: + if (rc > 0) + ns->ln_agent_entries_repaired++; + if (rc) + CDEBUG(D_LFSCK, "%s: repair agent entry for "DFID": rc = %d\n", + lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc); + return rc; +} + +/** * Double scan the MDT-object for namespace LFSCK. * * If the MDT-object contains invalid or repeated linkEA entries, then drop @@ -3374,6 +3706,8 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env, if (S_ISDIR(lfsck_object_type(child))) { dt_read_unlock(env, child); rc = lfsck_namespace_double_scan_dir(env, com, child, flags); + if (!rc && flags & LNTF_CHECK_AGENT_ENTRY) + rc = lfsck_namespace_check_agent_entry(env, com, child); RETURN(rc); } @@ -3441,12 +3775,22 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env, repaired = true; - /* fall through */ + /* fallthrough */ } parent = lfsck_object_find_bottom(env, lfsck, pfid); - if (IS_ERR(parent)) - GOTO(out, rc = PTR_ERR(parent)); + if (IS_ERR(parent)) { + rc = PTR_ERR(parent); + /* if @pfid doesn't have a valid OI mapping, it will + * trigger OI scrub, and -ENONET is is returned if it's + * remote, -EINPROGRESS if local. + */ + if ((rc == -ENOENT || rc == -EINPROGRESS) && + ldata.ld_leh->leh_reccount > 1) + rc = lfsck_namespace_shrink_linkea(env, com, + child, &ldata, cname, pfid, true); + GOTO(out, rc); + } if (!dt_object_exists(parent)) { @@ -3489,7 +3833,7 @@ lost_parent: /* Add the missing name entry to the parent. */ rc = lfsck_namespace_insert_normal(env, com, - parent, child, cname->ln_name); + parent, child, cname); if (unlikely(rc == -EEXIST)) /* Unfortunately, someone reused the * name under the parent by race. So we @@ -3530,8 +3874,7 @@ lost_parent: continue; } - rc = dt_lookup(env, parent, (struct dt_rec *)cfid, - (const struct dt_key *)cname->ln_name); + rc = dt_lookup_dir(env, parent, cname->ln_name, cfid); if (rc != 0 && rc != -ENOENT) { lfsck_object_put(env, parent); @@ -3609,7 +3952,8 @@ lost_parent: GOTO(out, rc = 0); } - rc = lfsck_namespace_check_name(env, parent, child, cname); + rc = lfsck_namespace_check_name(env, lfsck, parent, child, + cname); if (rc == -ENOENT) goto lost_parent; @@ -3635,7 +3979,7 @@ lost_parent: /* Add the missing name entry back to the namespace. */ rc = lfsck_namespace_insert_normal(env, com, parent, child, - cname->ln_name); + cname); if (unlikely(rc == -ESTALE)) /* It may happen when the remote object has been * removed, but the local MDT is not aware of that. */ @@ -3733,6 +4077,9 @@ out: rc = 1; } + if (!rc && flags & LNTF_CHECK_AGENT_ENTRY) + rc = lfsck_namespace_check_agent_entry(env, com, child); + return rc; } @@ -3778,6 +4125,7 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m, "striped_shards_skipped: %llu\n" "name_hash_%s: %llu\n" "linkea_overflow_%s: %llu\n" + "agent_entries_%s: %llu\n" "success_count: %u\n" "run_time_phase1: %lld seconds\n" "run_time_phase2: %lld seconds\n", @@ -3817,6 +4165,7 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m, postfix, ns->ln_name_hash_repaired, dryrun ? "inconsistent" : "cleared", ns->ln_linkea_overflow_cleared, + postfix, ns->ln_agent_entries_repaired, ns->ln_success_count, time_phase1, time_phase2); @@ -3917,7 +4266,7 @@ static int lfsck_namespace_reset(const struct lu_env *env, if (rc != 0) GOTO(out, rc); - lad->lad_incomplete = 0; + clear_bit(LAD_INCOMPLETE, &lad->lad_flags); CFS_RESET_BITMAP(lad->lad_bitmap); rc = lfsck_namespace_store(env, com); @@ -3957,14 +4306,19 @@ static void lfsck_namespace_close_dir(const struct lu_env *env, struct lfsck_instance *lfsck = com->lc_lfsck; struct lfsck_lmv *llmv = lfsck->li_lmv; struct lfsck_namespace_req *lnr; - __u32 size = - sizeof(*lnr) + LFSCK_TMPBUF_LEN; - bool wakeup = false; + struct lu_attr *la = &lfsck_env_info(env)->lti_la2; + __u32 size = sizeof(*lnr) + LFSCK_TMPBUF_LEN; + int rc; + bool wakeup = false; ENTRY; if (llmv == NULL) RETURN_EXIT; + rc = dt_attr_get(env, lfsck->li_obj_dir, la); + if (rc) + RETURN_EXIT; + OBD_ALLOC(lnr, size); if (lnr == NULL) { ns->ln_striped_dirs_skipped++; @@ -3973,7 +4327,7 @@ static void lfsck_namespace_close_dir(const struct lu_env *env, } lso = lfsck_assistant_object_init(env, lfsck_dto2fid(lfsck->li_obj_dir), - NULL, lfsck->li_pos_current.lp_oit_cookie, true); + la, lfsck->li_pos_current.lp_oit_cookie, true); if (IS_ERR(lso)) { OBD_FREE(lnr, size); ns->ln_striped_dirs_skipped++; @@ -3989,9 +4343,12 @@ static void lfsck_namespace_close_dir(const struct lu_env *env, lnr->lnr_fid = *lfsck_dto2fid(lfsck->li_obj_dir); lnr->lnr_dir_cookie = MDS_DIR_END_OFF; lnr->lnr_size = size; + lnr->lnr_type = lso->lso_attr.la_mode; spin_lock(&lad->lad_lock); - if (lad->lad_assistant_status < 0) { + if (lad->lad_assistant_status < 0 || + unlikely(!thread_is_running(&lfsck->li_thread) || + !thread_is_running(&lad->lad_thread))) { spin_unlock(&lad->lad_lock); lfsck_namespace_assistant_req_fini(env, &lnr->lnr_lar); ns->ln_striped_dirs_skipped++; @@ -4006,7 +4363,7 @@ static void lfsck_namespace_close_dir(const struct lu_env *env, lad->lad_prefetched++; spin_unlock(&lad->lad_lock); if (wakeup) - wake_up_all(&lad->lad_thread.t_ctl_waitq); + wake_up(&lad->lad_thread.t_ctl_waitq); EXIT; } @@ -4183,19 +4540,32 @@ static int lfsck_namespace_exec_oit(const struct lu_env *env, struct lfsck_component *com, struct dt_object *obj) { - struct lfsck_thread_info *info = lfsck_env_info(env); - struct lfsck_namespace *ns = com->lc_file_ram; - struct lfsck_instance *lfsck = com->lc_lfsck; - const struct lu_fid *fid = lfsck_dto2fid(obj); - struct lu_fid *pfid = &info->lti_fid2; - struct lu_name *cname = &info->lti_name; - struct lu_seq_range *range = &info->lti_range; - struct seq_server_site *ss = lfsck_dev_site(lfsck); - struct linkea_data ldata = { NULL }; - __u32 idx = lfsck_dev_idx(lfsck); - int rc; + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_instance *lfsck = com->lc_lfsck; + const struct lu_fid *fid = lfsck_dto2fid(obj); + struct lu_fid *pfid = &info->lti_fid2; + struct lu_name *cname = &info->lti_name; + struct lu_seq_range *range = &info->lti_range; + struct seq_server_site *ss = lfsck_dev_site(lfsck); + struct linkea_data ldata = { NULL }; + __u32 idx = lfsck_dev_idx(lfsck); + struct lu_attr la = { .la_valid = 0 }; + bool remote = false; + int rc; ENTRY; + rc = dt_attr_get(env, obj, &la); + if (unlikely(rc || (la.la_valid & LA_FLAGS && + la.la_flags & LUSTRE_ORPHAN_FL))) { + CDEBUG(D_INFO, + "%s: skip orphan "DFID", %llx/%x: rc = %d\n", + lfsck_lfsck2name(lfsck), PFID(fid), + la.la_valid, la.la_flags, rc); + + return rc; + } + rc = lfsck_links_read(env, obj, &ldata); if (rc == -ENOENT) GOTO(out, rc = 0); @@ -4220,14 +4590,47 @@ static int lfsck_namespace_exec_oit(const struct lu_env *env, GOTO(out, rc = (rc == -ENOENT ? 0 : rc)); } + if (rc && rc != -ENODATA) + GOTO(out, rc); + if (rc == -ENODATA || unlikely(!ldata.ld_leh->leh_reccount)) { rc = lfsck_namespace_check_for_double_scan(env, com, obj); GOTO(out, rc); } - if (rc != 0) - GOTO(out, rc); + linkea_first_entry(&ldata); + while (ldata.ld_lee != NULL) { + linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, + cname, pfid); + if (!fid_is_sane(pfid)) { + rc = lfsck_namespace_trace_update(env, com, fid, + LNTF_CHECK_PARENT, true); + } else if (!linkea_entry_is_valid(&ldata, cname, pfid)) { + GOTO(out, rc); + } else { + fld_range_set_mdt(range); + rc = fld_server_lookup(env, ss->ss_server_fld, + fid_seq(pfid), range); + if ((rc == -ENOENT) || + (!rc && range->lsr_index != idx)) { + remote = true; + break; + } + } + if (rc) + GOTO(out, rc); + + linkea_next_entry(&ldata); + } + + if ((lu_object_has_agent_entry(&obj->do_lu) && !remote) || + (!lu_object_has_agent_entry(&obj->do_lu) && remote)) { + rc = lfsck_namespace_trace_update(env, com, fid, + LNTF_CHECK_AGENT_ENTRY, true); + if (rc) + GOTO(out, rc); + } /* Record multiple-linked object. */ if (ldata.ld_leh->leh_reccount > 1) { @@ -4237,23 +4640,11 @@ static int lfsck_namespace_exec_oit(const struct lu_env *env, GOTO(out, rc); } - linkea_first_entry(&ldata); - linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid); - if (!fid_is_sane(pfid)) { + if (remote) rc = lfsck_namespace_trace_update(env, com, fid, - LNTF_CHECK_PARENT, true); - } else { - fld_range_set_mdt(range); - rc = fld_local_lookup(env, ss->ss_server_fld, - fid_seq(pfid), range); - if ((rc == -ENOENT) || - (rc == 0 && range->lsr_index != idx)) - rc = lfsck_namespace_trace_update(env, com, fid, - LNTF_CHECK_LINKEA, true); - else - rc = lfsck_namespace_check_for_double_scan(env, com, - obj); - } + LNTF_CHECK_LINKEA, true); + else + rc = lfsck_namespace_check_for_double_scan(env, com, obj); GOTO(out, rc); @@ -4279,17 +4670,15 @@ static int lfsck_namespace_exec_dir(const struct lu_env *env, struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; struct ptlrpc_thread *mthread = &lfsck->li_thread; struct ptlrpc_thread *athread = &lad->lad_thread; - struct l_wait_info lwi = { 0 }; bool wakeup = false; - l_wait_event(mthread->t_ctl_waitq, - lad->lad_prefetched < bk->lb_async_windows || - !thread_is_running(mthread) || - thread_is_stopped(athread), - &lwi); + wait_event_idle(mthread->t_ctl_waitq, + lad->lad_prefetched < bk->lb_async_windows || + !thread_is_running(mthread) || + !thread_is_running(athread)); - if (unlikely(!thread_is_running(mthread)) || - thread_is_stopped(athread)) + if (unlikely(!thread_is_running(mthread) || + !thread_is_running(athread))) return 0; if (unlikely(lfsck_is_dead_obj(lfsck->li_obj_dir))) @@ -4304,7 +4693,9 @@ static int lfsck_namespace_exec_dir(const struct lu_env *env, } spin_lock(&lad->lad_lock); - if (lad->lad_assistant_status < 0) { + if (lad->lad_assistant_status < 0 || + unlikely(!thread_is_running(mthread) || + !thread_is_running(athread))) { spin_unlock(&lad->lad_lock); lfsck_namespace_assistant_req_fini(env, &lnr->lnr_lar); return lad->lad_assistant_status; @@ -4317,7 +4708,7 @@ static int lfsck_namespace_exec_dir(const struct lu_env *env, lad->lad_prefetched++; spin_unlock(&lad->lad_lock); if (wakeup) - wake_up_all(&lad->lad_thread.t_ctl_waitq); + wake_up(&lad->lad_thread.t_ctl_waitq); down_write(&com->lc_sem); com->lc_new_checked++; @@ -4427,10 +4818,10 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com, time64_t rtime = ns->ln_run_time_phase1 + duration; if (duration != 0) - div_u64(new_checked, duration); + new_checked = div64_s64(new_checked, duration); if (rtime != 0) - div_u64(speed, rtime); + speed = div64_s64(speed, rtime); lfsck_namespace_dump_statistics(m, ns, checked, 0, rtime, 0, bk->lb_param & LPF_DRYRUN); @@ -4488,20 +4879,20 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com, time64_t time0 = ns->ln_run_time_phase1 + rtime; if (duration != 0) - div_u64(new_checked, duration); + new_checked = div64_s64(new_checked, duration); if (ns->ln_run_time_phase1 != 0) - div_u64(speed1, ns->ln_run_time_phase1); + speed1 = div64_s64(speed1, ns->ln_run_time_phase1); else if (ns->ln_items_checked != 0) time0++; if (rtime != 0) - div_u64(speed2, rtime); + speed2 = div64_s64(speed2, rtime); else if (checked != 0) time0++; if (time0 != 0) - div_u64(speed0, time0); + speed0 = div64_s64(speed0, time0); lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked, checked, @@ -4525,17 +4916,17 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com, time64_t time0 = ns->ln_run_time_phase1 + ns->ln_run_time_phase2; if (ns->ln_run_time_phase1 != 0) - div_u64(speed1, ns->ln_run_time_phase1); + speed1 = div64_s64(speed1, ns->ln_run_time_phase1); else if (ns->ln_items_checked != 0) time0++; if (ns->ln_run_time_phase2 != 0) - div_u64(speed2, ns->ln_run_time_phase2); + speed2 = div64_s64(speed2, ns->ln_run_time_phase2); else if (ns->ln_objs_checked_phase2 != 0) time0++; if (time0 != 0) - div_u64(speed0, time0); + speed0 = div64_s64(speed0, time0); lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked, ns->ln_objs_checked_phase2, @@ -4759,7 +5150,7 @@ static int lfsck_namespace_in_notify(const struct lu_env *env, stop->ls_flags = lr->lr_param & ~LPF_BROADCAST; lfsck_stop(env, lfsck->li_bottom, stop); } else if (lfsck_phase2_next_ready(lad)) { - wake_up_all(&lad->lad_thread.t_ctl_waitq); + wake_up(&lad->lad_thread.t_ctl_waitq); } RETURN(0); @@ -4842,7 +5233,7 @@ static int lfsck_namespace_query(const struct lu_env *env, return rc; } -static struct lfsck_operations lfsck_namespace_ops = { +static const struct lfsck_operations lfsck_namespace_ops = { .lfsck_reset = lfsck_namespace_reset, .lfsck_fail = lfsck_namespace_fail, .lfsck_close_dir = lfsck_namespace_close_dir, @@ -4892,26 +5283,28 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, struct dt_object *child, struct lfsck_namespace_req *lnr) { - struct lfsck_thread_info *info = lfsck_env_info(env); - struct lu_attr *la = &info->lti_la; - struct dt_allocation_hint *hint = &info->lti_hint; - struct dt_object_format *dof = &info->lti_dof; - struct dt_insert_rec *rec = &info->lti_dt_rec; - struct lmv_mds_md_v1 *lmv2 = &info->lti_lmv2; - const struct lu_name *cname; - const struct lu_fid *pfid = lfsck_dto2fid(parent); - const struct lu_fid *cfid = lfsck_dto2fid(child); - struct linkea_data ldata = { NULL }; - struct lfsck_lock_handle *llh = &info->lti_llh; - struct lu_buf linkea_buf; - struct lu_buf lmv_buf; - struct lfsck_instance *lfsck = com->lc_lfsck; - struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; - struct dt_device *dev = lfsck->li_next; - struct thandle *th = NULL; - int rc = 0; - __u16 type = lnr->lnr_type; - bool create; + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lu_attr *la = &info->lti_la; + struct dt_allocation_hint *hint = &info->lti_hint; + struct dt_object_format *dof = &info->lti_dof; + struct dt_insert_rec *rec = &info->lti_dt_rec; + struct lmv_mds_md_v1 *lmv2 = &info->lti_lmv2; + const struct lu_name *cname; + const struct lu_fid *pfid = lfsck_dto2fid(parent); + const struct lu_fid *cfid = lfsck_dto2fid(child); + struct linkea_data ldata = { NULL }; + struct lfsck_lock_handle *llh = &info->lti_llh; + struct lustre_handle rlh = { 0 }; + struct lustre_handle clh = { 0 }; + struct lu_buf linkea_buf; + struct lu_buf lmv_buf; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; + struct dt_device *dev = lfsck->li_next; + struct thandle *th = NULL; + int rc = 0; + __u16 type = lnr->lnr_type; + bool create; ENTRY; cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen); @@ -4943,7 +5336,7 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, GOTO(log, rc); rc = lfsck_lock(env, lfsck, parent, lnr->lnr_name, llh, - MDS_INODELOCK_UPDATE, LCK_PR); + MDS_INODELOCK_UPDATE, LCK_PW); if (rc != 0) GOTO(log, rc); @@ -4951,17 +5344,40 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, if (rc != 0) GOTO(log, rc); + if (dt_object_remote(child)) { + rc = lfsck_remote_lookup_lock(env, lfsck, parent, child, &rlh, + LCK_EX); + if (rc != 0) + GOTO(log, rc); + } + + rc = lfsck_ibits_lock(env, lfsck, child, &clh, + MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP | + MDS_INODELOCK_XATTR, LCK_EX); + if (rc != 0) + GOTO(unlock_remote_lookup, rc); + /* Set the ctime as zero, then others can know it is created for * repairing dangling name entry by LFSCK. And if the LFSCK made * wrong decision and the real MDT-object has been found later, * then the LFSCK has chance to fix the incosistency properly. */ memset(la, 0, sizeof(*la)); - la->la_mode = (type & S_IFMT) | 0600; - la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID | - LA_ATIME | LA_MTIME | LA_CTIME; - - child->do_ops->do_ah_init(env, hint, parent, child, - la->la_mode & S_IFMT); + if (S_ISDIR(type)) + la->la_mode = (type & S_IFMT) | 0700; + else + la->la_mode = (type & S_IFMT) | 0600; + la->la_valid = LA_TYPE | LA_MODE | LA_CTIME; + + /* + * if it's directory, skip do_ah_init() to create a plain directory + * because it may have shards already, which will be inserted back + * later, besides, it may be remote, and creating stripe directory + * remotely is not supported. + */ + if (S_ISREG(type)) + child->do_ops->do_ah_init(env, hint, parent, child, type); + else if (S_ISDIR(type)) + child->do_ops->do_ah_init(env, hint, NULL, child, type); memset(dof, 0, sizeof(*dof)); dof->dof_type = dt_mode_to_dft(type); @@ -4969,9 +5385,9 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, * the MDT-object without stripes (dof->dof_reg.striped = 0). related * OST-objects will be created when write open. */ - th = dt_trans_create(env, dev); + th = lfsck_trans_create(env, dev, lfsck); if (IS_ERR(th)) - GOTO(log, rc = PTR_ERR(th)); + GOTO(unlock_child, rc = PTR_ERR(th)); /* 1a. create child. */ rc = dt_declare_create(env, child, la, hint, dof, th); @@ -5035,6 +5451,21 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, if (rc != 0) GOTO(stop, rc); + /* 7a. if child is remote, delete and insert to generate local agent */ + if (dt_object_remote(child)) { + rc = dt_declare_delete(env, parent, + (const struct dt_key *)lnr->lnr_name, + th); + if (rc) + GOTO(stop, rc); + + rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec, + (const struct dt_key *)lnr->lnr_name, + th); + if (rc) + GOTO(stop, rc); + } + rc = dt_trans_start_local(env, dev, th); if (rc != 0) GOTO(stop, rc = (rc == -EEXIST ? 1 : rc)); @@ -5055,14 +5486,14 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, rec->rec_type = S_IFDIR; rec->rec_fid = cfid; rc = dt_insert(env, child, (const struct dt_rec *)rec, - (const struct dt_key *)dot, th, 1); + (const struct dt_key *)dot, th); if (rc != 0) GOTO(unlock, rc); /* 4b. insert dotdot into child dir */ rec->rec_fid = pfid; rc = dt_insert(env, child, (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th, 1); + (const struct dt_key *)dotdot, th); if (rc != 0) GOTO(unlock, rc); @@ -5078,6 +5509,23 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, /* 6b. insert linkEA for child. */ rc = dt_xattr_set(env, child, &linkea_buf, XATTR_NAME_LINK, 0, th); + if (rc) + GOTO(unlock, rc); + + /* 7b. if child is remote, delete and insert to generate local agent */ + if (dt_object_remote(child)) { + rc = dt_delete(env, parent, + (const struct dt_key *)lnr->lnr_name, th); + if (rc) + GOTO(unlock, rc); + + rec->rec_type = type; + rec->rec_fid = cfid; + rc = dt_insert(env, parent, (const struct dt_rec *)rec, + (const struct dt_key *)lnr->lnr_name, th); + if (rc) + GOTO(unlock, rc); + } GOTO(unlock, rc); @@ -5087,6 +5535,11 @@ unlock: stop: dt_trans_stop(env, dev, th); +unlock_child: + lfsck_ibits_unlock(&clh, LCK_EX); +unlock_remote_lookup: + if (dt_object_remote(child)) + lfsck_ibits_unlock(&rlh, LCK_EX); log: lfsck_unlock(llh); CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found dangling " @@ -5119,7 +5572,7 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env, const struct lu_name *cname; struct thandle *handle = NULL; struct lfsck_namespace_req *lnr = - container_of0(lar, struct lfsck_namespace_req, lnr_lar); + container_of(lar, struct lfsck_namespace_req, lnr_lar); struct dt_object *dir = NULL; struct dt_object *obj = NULL; struct lfsck_assistant_object *lso = lar->lar_parent; @@ -5128,8 +5581,8 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env, struct lustre_handle lh = { 0 }; bool repaired = false; bool dtlocked = false; - bool remove; - bool newdata; + bool remove = false; + bool newdata = false; bool log = false; bool bad_hash = false; bool bad_linkea = false; @@ -5153,11 +5606,9 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env, repaired = true; } - if (unlikely(fid_is_zero(&lnr->lnr_fid))) { - if (strcmp(lnr->lnr_name, dotdot) != 0) - LBUG(); - else - rc = lfsck_namespace_trace_update(env, com, pfid, + if (unlikely(fid_is_zero(&lnr->lnr_fid) && + strcmp(lnr->lnr_name, dotdot) == 0)) { + rc = lfsck_namespace_trace_update(env, com, pfid, LNTF_CHECK_PARENT, true); GOTO(out, rc); @@ -5276,7 +5727,7 @@ again: if (rc != 0) GOTO(out, rc); - handle = dt_trans_create(env, dev); + handle = lfsck_trans_create(env, dev, lfsck); if (IS_ERR(handle)) GOTO(out, rc = PTR_ERR(handle)); @@ -5374,6 +5825,17 @@ again: newdata = true; nodata: + if (rc == -ENOENT && + linkea_will_overflow(&ldata, cname)) { + CDEBUG(D_INODE, "No enough space to hold linkea entry '" + DFID": %.*s' at %u\n", PFID(pfid), + cname->ln_namelen, cname->ln_name, + ldata.ld_leh->leh_overflow_time); + log = true; + rc = 0; + goto stop; + } + if (bk->lb_param & LPF_DRYRUN) { if (rc == -ENODATA) ns->ln_flags |= LF_UPGRADE; @@ -5429,7 +5891,7 @@ nodata: GOTO(stop, rc); } - rc = linkea_add_buf(&ldata, cname, pfid); + rc = linkea_add_buf(&ldata, cname, pfid, false); if (rc == 0) rc = lfsck_links_write(env, obj, &ldata, handle); if (rc != 0) @@ -5516,6 +5978,17 @@ out: if (obj != NULL && count == 1 && S_ISREG(lfsck_object_type(obj))) dt_attr_get(env, obj, la); + + /* if new linkea entry is added, the old entry may be stale, + * check it in phase 2. Sigh, linkea check can only be done + * locally. + */ + if (bad_linkea && !remove && !newdata && + !dt_object_remote(obj) && count > 1) + rc = lfsck_namespace_trace_update(env, com, + &lnr->lnr_fid, + LNTF_CHECK_LINKEA, + true); } trace: @@ -5653,13 +6126,19 @@ static int lfsck_namespace_scan_local_lpf_one(const struct lu_env *env, int rc = 0; __u8 flags = 0; bool exist = false; + ENTRY; child = lfsck_object_find_by_dev(env, dev, &ent->lde_fid); if (IS_ERR(child)) RETURN(PTR_ERR(child)); - LASSERT(dt_object_exists(child)); + if (!dt_object_exists(child)) { + CDEBUG(D_LFSCK, "%s: lost+found/%s doesn't exist\n", + lfsck_lfsck2name(lfsck), ent->lde_name); + GOTO(out, rc = -ENOENT); + } + LASSERT(!dt_object_remote(child)); idx = lfsck_sub_trace_file_fid2idx(&ent->lde_fid); @@ -5676,7 +6155,7 @@ static int lfsck_namespace_scan_local_lpf_one(const struct lu_env *env, GOTO(out, rc); } - th = dt_trans_create(env, dev); + th = lfsck_trans_create(env, dev, lfsck); if (IS_ERR(th)) GOTO(out, rc = PTR_ERR(th)); @@ -5747,7 +6226,7 @@ static int lfsck_namespace_scan_local_lpf_one(const struct lu_env *env, /* b5. insert child's FID into the LFSCK trace file. */ rc = dt_insert(env, obj, (const struct dt_rec *)&flags, - (const struct dt_key *)key, th, 1); + (const struct dt_key *)key, th); GOTO(stop, rc = (rc == 0 ? 1 : rc)); @@ -6340,7 +6819,7 @@ static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env, int rc = 0; ENTRY; - if (!lad->lad_incomplete) + if (!test_bit(LAD_INCOMPLETE, &lad->lad_flags)) RETURN_EXIT; set = ptlrpc_prep_set(); @@ -6368,7 +6847,7 @@ static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env, } up_read(<ds->ltd_rw_sem); - rc = ptlrpc_set_wait(set); + rc = ptlrpc_set_wait(env, set); ptlrpc_set_destroy(set); GOTO(out, rc); @@ -6383,7 +6862,7 @@ out: EXIT; } -struct lfsck_assistant_operations lfsck_namespace_assistant_ops = { +const struct lfsck_assistant_operations lfsck_namespace_assistant_ops = { .la_handler_p1 = lfsck_namespace_assistant_handler_p1, .la_handler_p2 = lfsck_namespace_assistant_handler_p2, .la_fill_pos = lfsck_namespace_assistant_fill_pos, @@ -6405,8 +6884,9 @@ struct lfsck_assistant_operations lfsck_namespace_assistant_ops = { * \retval 0 for success * \retval negative error number on failure */ -int lfsck_verify_linkea(const struct lu_env *env, struct dt_object *obj, - const struct lu_name *cname, const struct lu_fid *pfid) +int lfsck_verify_linkea(const struct lu_env *env, struct lfsck_instance *lfsck, + struct dt_object *obj, const struct lu_name *cname, + const struct lu_fid *pfid) { struct dt_device *dev = lfsck_obj2dev(obj); struct linkea_data ldata = { NULL }; @@ -6415,9 +6895,14 @@ int lfsck_verify_linkea(const struct lu_env *env, struct dt_object *obj, int rc; int fl = LU_XATTR_CREATE; bool dirty = false; + ENTRY; - LASSERT(S_ISDIR(lfsck_object_type(obj))); + if (!dt_object_exists(obj)) + RETURN(-ENOENT); + + if (!S_ISDIR(lfsck_object_type(obj))) + RETURN(-ENOTDIR); rc = lfsck_links_read_with_rec(env, obj, &ldata); if (rc == -ENODATA) { @@ -6443,7 +6928,7 @@ int lfsck_verify_linkea(const struct lu_env *env, struct dt_object *obj, lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf, ldata.ld_leh->leh_len); - th = dt_trans_create(env, dev); + th = lfsck_trans_create(env, dev, lfsck); if (IS_ERR(th)) RETURN(PTR_ERR(th)); @@ -6494,6 +6979,9 @@ int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj, linkea_first_entry(&ldata); linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid); + if (!linkea_entry_is_valid(&ldata, cname, pfid)) + return -EINVAL; + /* To guarantee the 'name' is terminated with '0'. */ memcpy(name, cname->ln_name, cname->ln_namelen); name[cname->ln_namelen] = 0; @@ -6534,7 +7022,7 @@ int lfsck_update_name_entry(const struct lu_env *env, if (rc != 0) RETURN(rc); - th = dt_trans_create(env, dev); + th = lfsck_trans_create(env, dev, lfsck); if (IS_ERR(th)) GOTO(unlock, rc = PTR_ERR(th)); @@ -6567,7 +7055,7 @@ int lfsck_update_name_entry(const struct lu_env *env, GOTO(stop, rc); rc = dt_insert(env, dir, (const struct dt_rec *)rec, - (const struct dt_key *)name, th, 1); + (const struct dt_key *)name, th); if (rc == 0 && S_ISDIR(type) && !exists) { dt_write_lock(env, dir, 0); rc = dt_ref_add(env, dir, th); @@ -6646,13 +7134,16 @@ int lfsck_namespace_setup(const struct lu_env *env, com->lc_obj = obj; rc = lfsck_namespace_load(env, com); - if (rc == -ENODATA) + if (rc == -ENODATA) { rc = lfsck_namespace_init(env, com); - else if (rc < 0) + } else if (rc < 0) { rc = lfsck_namespace_reset(env, com, true); - else + } else { rc = lfsck_load_sub_trace_files(env, com, &dt_lfsck_namespace_features, LFSCK_NAMESPACE, false); + if (rc) + rc = lfsck_namespace_reset(env, com, true); + } if (rc != 0) GOTO(out, rc); @@ -6669,14 +7160,14 @@ int lfsck_namespace_setup(const struct lu_env *env, default: CERROR("%s: unknown lfsck_namespace status %d\n", lfsck_lfsck2name(lfsck), ns->ln_status); - /* fall through */ + fallthrough; case LS_SCANNING_PHASE1: case LS_SCANNING_PHASE2: /* No need to store the status to disk right now. * If the system crashed before the status stored, * it will be loaded back when next time. */ ns->ln_status = LS_CRASHED; - /* fall through */ + fallthrough; case LS_PAUSED: case LS_CRASHED: spin_lock(&lfsck->li_lock);