X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flfsck%2Flfsck_namespace.c;h=1cb3963a5002c0c0161a0a926dd3cb650654a8b2;hp=529b5d1b0f7753894a75cc07d94148953d3732a6;hb=555d02f47401340182b47b3245a657b52fc3e68a;hpb=755c04eba3f52245ecddd5c0d7a205988181d7d1 diff --git a/lustre/lfsck/lfsck_namespace.c b/lustre/lfsck/lfsck_namespace.c index 529b5d1..1cb3963 100644 --- a/lustre/lfsck/lfsck_namespace.c +++ b/lustre/lfsck/lfsck_namespace.c @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2013, 2014, Intel Corporation. + * Copyright (c) 2013, 2016, Intel Corporation. */ /* * lustre/lfsck/lfsck_namespace.c @@ -59,6 +59,7 @@ enum lfsck_nameentry_check { static struct lfsck_namespace_req * lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck, + struct lfsck_assistant_object *lso, struct lu_dirent *ent, __u16 type) { struct lfsck_namespace_req *lnr; @@ -70,10 +71,9 @@ lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck, return ERR_PTR(-ENOMEM); INIT_LIST_HEAD(&lnr->lnr_lar.lar_list); - lnr->lnr_lar.lar_fid = *lfsck_dto2fid(lfsck->li_obj_dir); + lnr->lnr_lar.lar_parent = lfsck_assistant_object_get(lso); lnr->lnr_lmv = lfsck_lmv_get(lfsck->li_lmv); lnr->lnr_fid = ent->lde_fid; - lnr->lnr_oit_cookie = lfsck->li_pos_current.lp_oit_cookie; lnr->lnr_dir_cookie = ent->lde_hash; lnr->lnr_attr = ent->lde_attrs; lnr->lnr_size = size; @@ -93,6 +93,7 @@ static void lfsck_namespace_assistant_req_fini(const struct lu_env *env, if (lnr->lnr_lmv != NULL) lfsck_lmv_put(env, lnr->lnr_lmv); + lfsck_assistant_object_put(env, lar->lar_parent); OBD_FREE(lnr, lnr->lnr_size); } @@ -163,6 +164,9 @@ static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst, dst->ln_local_lpf_skipped = le64_to_cpu(src->ln_local_lpf_skipped); dst->ln_local_lpf_failed = le64_to_cpu(src->ln_local_lpf_failed); dst->ln_bitmap_size = le32_to_cpu(src->ln_bitmap_size); + dst->ln_time_latest_reset = le32_to_cpu(src->ln_time_latest_reset); + dst->ln_linkea_overflow_cleared = + le64_to_cpu(src->ln_linkea_overflow_cleared); } static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst, @@ -232,6 +236,9 @@ static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst, dst->ln_local_lpf_skipped = cpu_to_le64(src->ln_local_lpf_skipped); dst->ln_local_lpf_failed = cpu_to_le64(src->ln_local_lpf_failed); dst->ln_bitmap_size = cpu_to_le32(src->ln_bitmap_size); + dst->ln_time_latest_reset = cpu_to_le32(src->ln_time_latest_reset); + dst->ln_linkea_overflow_cleared = + cpu_to_le64(src->ln_linkea_overflow_cleared); } static void lfsck_namespace_record_failure(const struct lu_env *env, @@ -247,7 +254,7 @@ static void lfsck_namespace_record_failure(const struct lu_env *env, ns->ln_pos_first_inconsistent = pos; CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired " - "inconsistency at the pos ["LPU64", "DFID", "LPX64"]\n", + "inconsistency at the pos [%llu, "DFID", %#llx]\n", lfsck_lfsck2name(lfsck), ns->ln_pos_first_inconsistent.lp_oit_cookie, PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent), @@ -270,7 +277,7 @@ static int lfsck_namespace_load_bitmap(const struct lu_env *env, struct dt_object *obj = com->lc_obj; struct lfsck_assistant_data *lad = com->lc_data; struct lfsck_namespace *ns = com->lc_file_ram; - cfs_bitmap_t *bitmap = lad->lad_bitmap; + struct cfs_bitmap *bitmap = lad->lad_bitmap; ssize_t size; __u32 nbits; int rc; @@ -287,7 +294,7 @@ static int lfsck_namespace_load_bitmap(const struct lu_env *env, if (nbits > bitmap->size) { __u32 new_bits = bitmap->size; - cfs_bitmap_t *new_bitmap; + struct cfs_bitmap *new_bitmap; while (new_bits < nbits) new_bits <<= 1; @@ -311,7 +318,7 @@ static int lfsck_namespace_load_bitmap(const struct lu_env *env, size = (ns->ln_bitmap_size + 7) >> 3; rc = dt_xattr_get(env, obj, lfsck_buf_get(env, bitmap->data, size), - XATTR_NAME_LFSCK_BITMAP, BYPASS_CAPA); + XATTR_NAME_LFSCK_BITMAP); if (rc != size) RETURN(rc >= 0 ? -EINVAL : rc); @@ -326,12 +333,6 @@ static int lfsck_namespace_load_bitmap(const struct lu_env *env, /** * Load namespace LFSCK statistics information from the trace file. * - * For old release (Lustre-2.6 or older), the statistics information was - * stored as XATTR_NAME_LFSCK_NAMESPACE_OLD EA. But in Lustre-2.7, we need - * more statistics information. To avoid confusing old MDT when downgrade, - * Lustre-2.7 stores the namespace LFSCK statistics information as new - * XATTR_NAME_LFSCK_NAMESPACE EA. - * * \param[in] env pointer to the thread context * \param[in] com pointer to the lfsck component * @@ -346,7 +347,7 @@ static int lfsck_namespace_load(const struct lu_env *env, rc = dt_xattr_get(env, com->lc_obj, lfsck_buf_get(env, com->lc_file_disk, len), - XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA); + XATTR_NAME_LFSCK_NAMESPACE); if (rc == len) { struct lfsck_namespace *ns = com->lc_file_ram; @@ -366,34 +367,24 @@ static int lfsck_namespace_load(const struct lu_env *env, lfsck_lfsck2name(com->lc_lfsck), len, rc); if (rc >= 0) rc = -ESTALE; - } else { - /* Check whether it is old trace file or not. - * If yes, it should be reset via returning -ESTALE. */ - rc = dt_xattr_get(env, com->lc_obj, - lfsck_buf_get(env, com->lc_file_disk, len), - XATTR_NAME_LFSCK_NAMESPACE_OLD, BYPASS_CAPA); - if (rc >= 0) - rc = -ESTALE; } return rc; } static int lfsck_namespace_store(const struct lu_env *env, - struct lfsck_component *com, bool init) + struct lfsck_component *com) { struct dt_object *obj = com->lc_obj; struct lfsck_instance *lfsck = com->lc_lfsck; struct lfsck_namespace *ns = com->lc_file_ram; struct lfsck_assistant_data *lad = com->lc_data; - cfs_bitmap_t *bitmap = NULL; + struct dt_device *dev = lfsck_obj2dev(obj); + struct cfs_bitmap *bitmap = NULL; struct thandle *handle; __u32 nbits = 0; int len = com->lc_file_size; int rc; -#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 8, 53, 0) - struct lu_buf tbuf = { &len, sizeof(len) }; -#endif ENTRY; if (lad != NULL) { @@ -407,7 +398,7 @@ static int lfsck_namespace_store(const struct lu_env *env, ns->ln_bitmap_size = nbits; lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk, ns); - handle = dt_trans_create(env, lfsck->li_bottom); + handle = dt_trans_create(env, dev); if (IS_ERR(handle)) GOTO(log, rc = PTR_ERR(handle)); @@ -425,44 +416,22 @@ static int lfsck_namespace_store(const struct lu_env *env, GOTO(out, rc); } -#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 8, 53, 0) - /* To be compatible with old Lustre-2.x MDT (x <= 6), generate dummy - * XATTR_NAME_LFSCK_NAMESPACE_OLD EA, then when downgrade to Lustre-2.x, - * the old LFSCK will find "invalid" XATTR_NAME_LFSCK_NAMESPACE_OLD EA, - * then reset the namespace LFSCK trace file. */ - if (init) { - rc = dt_declare_xattr_set(env, obj, &tbuf, - XATTR_NAME_LFSCK_NAMESPACE_OLD, - LU_XATTR_CREATE, handle); - if (rc != 0) - GOTO(out, rc); - } -#endif - - rc = dt_trans_start_local(env, lfsck->li_bottom, handle); + rc = dt_trans_start_local(env, dev, handle); if (rc != 0) GOTO(out, rc); rc = dt_xattr_set(env, obj, lfsck_buf_get(env, com->lc_file_disk, len), - XATTR_NAME_LFSCK_NAMESPACE, 0, handle, BYPASS_CAPA); + XATTR_NAME_LFSCK_NAMESPACE, 0, handle); if (rc == 0 && bitmap != NULL) rc = dt_xattr_set(env, obj, lfsck_buf_get(env, bitmap->data, nbits >> 3), - XATTR_NAME_LFSCK_BITMAP, 0, handle, - BYPASS_CAPA); - -#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 8, 53, 0) - if (rc == 0 && init) - rc = dt_xattr_set(env, obj, &tbuf, - XATTR_NAME_LFSCK_NAMESPACE_OLD, - LU_XATTR_CREATE, handle, BYPASS_CAPA); -#endif + XATTR_NAME_LFSCK_BITMAP, 0, handle); GOTO(out, rc); out: - dt_trans_stop(env, lfsck->li_bottom, handle); + dt_trans_stop(env, dev, handle); log: if (rc != 0) @@ -471,70 +440,6 @@ log: return rc; } -static struct dt_object * -lfsck_namespace_load_one_trace_file(const struct lu_env *env, - struct lfsck_component *com, - struct dt_object *parent, - const char *name, - const struct dt_index_features *ft, - bool reset) -{ - struct lfsck_instance *lfsck = com->lc_lfsck; - struct dt_object *obj; - int rc; - - if (reset) { - rc = local_object_unlink(env, lfsck->li_bottom, parent, name); - if (rc != 0 && rc != -ENOENT) - return ERR_PTR(rc); - } - - if (ft != NULL) - obj = local_index_find_or_create(env, lfsck->li_los, parent, - name, S_IFREG | S_IRUGO | S_IWUSR, ft); - else - obj = local_file_find_or_create(env, lfsck->li_los, parent, - name, S_IFREG | S_IRUGO | S_IWUSR); - - return obj; -} - -static int lfsck_namespace_load_sub_trace_files(const struct lu_env *env, - struct lfsck_component *com, - bool reset) -{ - char *name = lfsck_env_info(env)->lti_key; - struct lfsck_sub_trace_obj *lsto; - struct dt_object *obj; - int rc; - int i; - - for (i = 0, lsto = &com->lc_sub_trace_objs[0]; - i < LFSCK_STF_COUNT; i++, lsto++) { - snprintf(name, NAME_MAX, "%s_%02d", LFSCK_NAMESPACE, i); - if (lsto->lsto_obj != NULL) { - if (!reset) - continue; - - lu_object_put(env, &lsto->lsto_obj->do_lu); - lsto->lsto_obj = NULL; - } - - obj = lfsck_namespace_load_one_trace_file(env, com, - com->lc_lfsck->li_lfsck_dir, - name, &dt_lfsck_features, reset); - if (IS_ERR(obj)) - return PTR_ERR(obj); - - lsto->lsto_obj = obj; - rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features); - if (rc != 0) - return rc; - } - - return 0; -} - static int lfsck_namespace_init(const struct lu_env *env, struct lfsck_component *com) { @@ -544,11 +449,13 @@ static int lfsck_namespace_init(const struct lu_env *env, memset(ns, 0, sizeof(*ns)); ns->ln_magic = LFSCK_NAMESPACE_MAGIC; ns->ln_status = LS_INIT; + ns->ln_time_latest_reset = cfs_time_current_sec(); down_write(&com->lc_sem); - rc = lfsck_namespace_store(env, com, true); - up_write(&com->lc_sem); + rc = lfsck_namespace_store(env, com); if (rc == 0) - rc = lfsck_namespace_load_sub_trace_files(env, com, true); + rc = lfsck_load_sub_trace_files(env, com, + &dt_lfsck_namespace_features, LFSCK_NAMESPACE, true); + up_write(&com->lc_sem); return rc; } @@ -562,7 +469,7 @@ static int lfsck_namespace_init(const struct lu_env *env, * trace file * \param[in] add true if add new flags, otherwise remove flags * - * \retval 0 for succeed or nothing to be done + * \retval 0 for success or nothing to be done * \retval negative error number on failure */ int lfsck_namespace_trace_update(const struct lu_env *env, @@ -573,7 +480,7 @@ int lfsck_namespace_trace_update(const struct lu_env *env, struct lfsck_instance *lfsck = com->lc_lfsck; struct dt_object *obj; struct lu_fid *key = &lfsck_env_info(env)->lti_fid3; - struct dt_device *dev = lfsck->li_bottom; + struct dt_device *dev; struct thandle *th = NULL; int idx; int rc = 0; @@ -587,11 +494,18 @@ int lfsck_namespace_trace_update(const struct lu_env *env, RETURN(0); idx = lfsck_sub_trace_file_fid2idx(fid); - obj = com->lc_sub_trace_objs[idx].lsto_obj; mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex); + obj = com->lc_sub_trace_objs[idx].lsto_obj; + if (unlikely(obj == NULL)) { + mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex); + RETURN(0); + } + + lfsck_object_get(obj); + dev = lfsck_obj2dev(obj); fid_cpu_to_be(key, fid); rc = dt_lookup(env, obj, (struct dt_rec *)&old, - (const struct dt_key *)key, BYPASS_CAPA); + (const struct dt_key *)key); if (rc == -ENOENT) { if (!add) GOTO(unlock, rc = 0); @@ -638,15 +552,14 @@ int lfsck_namespace_trace_update(const struct lu_env *env, GOTO(log, rc); if (old != 0) { - rc = dt_delete(env, obj, (const struct dt_key *)key, - th, BYPASS_CAPA); + rc = dt_delete(env, obj, (const struct dt_key *)key, th); if (rc != 0) GOTO(log, rc); } if (new != 0) { rc = dt_insert(env, obj, (const struct dt_rec *)&new, - (const struct dt_key *)key, th, BYPASS_CAPA, 1); + (const struct dt_key *)key, th, 1); if (rc != 0) GOTO(log, rc); } @@ -664,6 +577,7 @@ log: unlock: mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex); + lfsck_object_put(env, obj); return rc; } @@ -680,7 +594,7 @@ int lfsck_namespace_check_exist(const struct lu_env *env, RETURN(LFSCK_NAMEENTRY_DEAD); rc = dt_lookup(env, dir, (struct dt_rec *)fid, - (const struct dt_key *)name, BYPASS_CAPA); + (const struct dt_key *)name); if (rc == -ENOENT) RETURN(LFSCK_NAMEENTRY_REMOVED); @@ -699,20 +613,23 @@ static int lfsck_declare_namespace_exec_dir(const struct lu_env *env, { int rc; + /* For remote updating LINKEA, there may be further LFSCK action + * on remote MDT after the updating, so update the LINKEA ASAP. */ + if (dt_object_remote(obj)) + handle->th_sync = 1; + /* For destroying all invalid linkEA entries. */ rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle); - if (rc != 0) - return rc; - - /* For insert new linkEA entry. */ - rc = dt_declare_xattr_set(env, obj, - lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE), + if (rc == 0) + /* For insert new linkEA entry. */ + rc = dt_declare_xattr_set(env, obj, + lfsck_buf_get_const(env, NULL, MAX_LINKEA_SIZE), XATTR_NAME_LINK, 0, handle); return rc; } int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj, - struct linkea_data *ldata) + struct linkea_data *ldata, bool with_rec) { int rc; @@ -722,24 +639,32 @@ int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj, if (!dt_object_exists(obj)) return -ENOENT; - rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA); + rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK); if (rc == -ERANGE) { /* Buf was too small, figure out what we need. */ - rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LINK, - BYPASS_CAPA); - if (rc <= 0) + rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LINK); + if (unlikely(rc == 0)) + return -ENODATA; + + if (rc < 0) return rc; lu_buf_realloc(ldata->ld_buf, rc); if (ldata->ld_buf->lb_buf == NULL) return -ENOMEM; - rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, - BYPASS_CAPA); + rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK); } - if (rc > 0) - rc = linkea_init(ldata); + if (unlikely(rc == 0)) + return -ENODATA; + + if (rc > 0) { + if (with_rec) + rc = linkea_init_with_rec(ldata); + else + rc = linkea_init(ldata); + } return rc; } @@ -761,7 +686,7 @@ static int lfsck_namespace_links_remove(const struct lu_env *env, struct dt_object *obj) { struct lfsck_instance *lfsck = com->lc_lfsck; - struct dt_device *dev = lfsck->li_bottom; + struct dt_device *dev = lfsck_obj2dev(obj); struct thandle *th = NULL; int rc = 0; ENTRY; @@ -787,7 +712,7 @@ static int lfsck_namespace_links_remove(const struct lu_env *env, if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) GOTO(unlock, rc = 0); - rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th, BYPASS_CAPA); + rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th); GOTO(unlock, rc); @@ -814,24 +739,77 @@ log: static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj, struct linkea_data *ldata, struct thandle *handle) { - const struct lu_buf *buf = lfsck_buf_get_const(env, - ldata->ld_buf->lb_buf, - ldata->ld_leh->leh_len); + struct lu_buf buf; + int rc; + + lfsck_buf_init(&buf, ldata->ld_buf->lb_buf, ldata->ld_leh->leh_len); + +again: + rc = dt_xattr_set(env, obj, &buf, XATTR_NAME_LINK, 0, handle); + if (unlikely(rc == -ENOSPC)) { + rc = linkea_overflow_shrink(ldata); + if (likely(rc > 0)) { + buf.lb_len = rc; + goto again; + } + } - return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle, - BYPASS_CAPA); + return rc; } -static void lfsck_namespace_unpack_linkea_entry(struct linkea_data *ldata, - struct lu_name *cname, - struct lu_fid *pfid, - char *buf) +static int lfsck_namespace_unpack_linkea_entry(struct linkea_data *ldata, + struct lu_name *cname, + struct lu_fid *pfid, + char *buf, const int buflen) { linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid); + if (unlikely(ldata->ld_reclen <= 0 || + ldata->ld_reclen + sizeof(struct link_ea_header) > + ldata->ld_leh->leh_len || + cname->ln_namelen <= 0 || + cname->ln_namelen > NAME_MAX || + cname->ln_namelen >= buflen || + !fid_is_sane(pfid))) + return -EINVAL; + /* To guarantee the 'name' is terminated with '0'. */ memcpy(buf, cname->ln_name, cname->ln_namelen); buf[cname->ln_namelen] = 0; cname->ln_name = buf; + + return 0; +} + +static void lfsck_linkea_del_buf(struct linkea_data *ldata, + const struct lu_name *lname) +{ + LASSERT(ldata->ld_leh != NULL && ldata->ld_lee != NULL); + + /* If current record is corrupted, all the subsequent + * records will be dropped. */ + if (unlikely(ldata->ld_reclen <= 0 || + ldata->ld_reclen + sizeof(struct link_ea_header) > + ldata->ld_leh->leh_len)) { + void *ptr = ldata->ld_lee; + + ldata->ld_leh->leh_len = sizeof(struct link_ea_header); + ldata->ld_leh->leh_reccount = 0; + linkea_first_entry(ldata); + while (ldata->ld_lee != NULL && + (char *)ldata->ld_lee < (char *)ptr) { + int reclen = (ldata->ld_lee->lee_reclen[0] << 8) | + ldata->ld_lee->lee_reclen[1]; + + ldata->ld_leh->leh_len += reclen; + ldata->ld_leh->leh_reccount++; + ldata->ld_lee = (struct link_ea_entry *) + ((char *)ldata->ld_lee + reclen); + } + + ldata->ld_lee = NULL; + } else { + linkea_del_buf(ldata, lname); + } } static int lfsck_namespace_filter_linkea_entry(struct linkea_data *ldata, @@ -855,7 +833,7 @@ static int lfsck_namespace_filter_linkea_entry(struct linkea_data *ldata, if (!remove) break; - linkea_del_buf(ldata, cname); + lfsck_linkea_del_buf(ldata, cname); } else { linkea_next_entry(ldata); } @@ -913,17 +891,17 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env, struct lfsck_thread_info *info = lfsck_env_info(env); struct lu_name *cname = &info->lti_name; struct dt_insert_rec *rec = &info->lti_dt_rec; - struct lu_attr *la = &info->lti_la3; + struct lu_attr *la = &info->lti_la2; const struct lu_fid *cfid = lfsck_dto2fid(orphan); const struct lu_fid *pfid; struct lu_fid tfid; struct lfsck_instance *lfsck = com->lc_lfsck; - struct dt_device *dev = lfsck->li_bottom; + struct dt_device *dev = lfsck_obj2dev(orphan); struct dt_object *parent; struct thandle *th = NULL; - struct lustre_handle plh = { 0 }; + struct lfsck_lock_handle *pllh = &info->lti_llh; struct lustre_handle clh = { 0 }; - struct linkea_data ldata = { NULL }; + struct linkea_data ldata2 = { NULL }; struct lu_buf linkea_buf; int namelen; int idx = 0; @@ -938,18 +916,12 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env, parent = lfsck->li_lpf_obj; pfid = lfsck_dto2fid(parent); - /* Hold update lock on the parent to prevent others to access. */ - rc = lfsck_ibits_lock(env, lfsck, parent, &plh, - MDS_INODELOCK_UPDATE, LCK_EX); - if (rc != 0) - GOTO(log, rc); - +again: do { namelen = snprintf(info->lti_key, NAME_MAX, DFID"%s-%s-%d", PFID(cfid), infix, type, idx++); rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, - (const struct dt_key *)info->lti_key, - BYPASS_CAPA); + (const struct dt_key *)info->lti_key); if (rc != 0 && rc != -ENOENT) GOTO(log, rc); @@ -957,24 +929,44 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env, exist = true; } while (rc == 0 && !exist); - cname->ln_name = info->lti_key; - cname->ln_namelen = namelen; - rc = linkea_data_new(&ldata, &info->lti_linkea_buf2); + rc = lfsck_lock(env, lfsck, parent, info->lti_key, pllh, + MDS_INODELOCK_UPDATE, LCK_PW); if (rc != 0) GOTO(log, rc); - rc = linkea_add_buf(&ldata, cname, pfid); + /* Re-check whether the name conflict with othrs after taken + * the ldlm lock. */ + rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, + (const struct dt_key *)info->lti_key); + if (rc == 0) { + if (!lu_fid_eq(cfid, &tfid)) { + exist = false; + lfsck_unlock(pllh); + goto again; + } + + exist = true; + } else if (rc != -ENOENT) { + GOTO(log, rc); + } else { + exist = false; + } + + cname->ln_name = info->lti_key; + cname->ln_namelen = namelen; + rc = linkea_links_new(&ldata2, &info->lti_linkea_buf2, + cname, pfid); if (rc != 0) GOTO(log, rc); rc = lfsck_ibits_lock(env, lfsck, orphan, &clh, - MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP, - LCK_EX); + MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP | + MDS_INODELOCK_XATTR, LCK_EX); if (rc != 0) GOTO(log, rc); - lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf, - ldata.ld_leh->leh_len); + lfsck_buf_init(&linkea_buf, ldata2.ld_buf->lb_buf, + ldata2.ld_leh->leh_len); th = dt_trans_create(env, dev); if (IS_ERR(th)) GOTO(log, rc = PTR_ERR(th)); @@ -1026,33 +1018,30 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env, GOTO(stop, rc); dt_write_lock(env, orphan, 0); - rc = lfsck_links_read(env, orphan, &ldata); - if (likely((rc == -ENODATA) || (rc == -EINVAL) || - (rc == 0 && ldata.ld_leh->leh_reccount == 0))) { + rc = lfsck_links_read2_with_rec(env, orphan, &ldata2); + if (likely(rc == -ENODATA || rc == -EINVAL)) { if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) GOTO(unlock, rc = 1); if (S_ISDIR(lfsck_object_type(orphan))) { rc = dt_delete(env, orphan, - (const struct dt_key *)dotdot, th, - BYPASS_CAPA); + (const struct dt_key *)dotdot, th); if (rc != 0) GOTO(unlock, rc); rec->rec_type = S_IFDIR; rec->rec_fid = pfid; rc = dt_insert(env, orphan, (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th, - BYPASS_CAPA, 1); + (const struct dt_key *)dotdot, th, 1); if (rc != 0) GOTO(unlock, rc); } rc = dt_xattr_set(env, orphan, &linkea_buf, XATTR_NAME_LINK, 0, - th, BYPASS_CAPA); + th); } else { if (rc == 0 && count != NULL) - *count = ldata.ld_leh->leh_reccount; + *count = ldata2.ld_leh->leh_reccount; GOTO(unlock, rc); } @@ -1062,8 +1051,7 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env, rec->rec_type = lfsck_object_type(orphan) & S_IFMT; rec->rec_fid = cfid; rc = dt_insert(env, parent, (const struct dt_rec *)rec, - (const struct dt_key *)cname->ln_name, - th, BYPASS_CAPA, 1); + (const struct dt_key *)cname->ln_name, th, 1); if (rc == 0 && S_ISDIR(rec->rec_type)) { dt_write_lock(env, parent, 0); rc = dt_ref_add(env, parent, th); @@ -1072,7 +1060,7 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env, } if (rc == 0) - rc = dt_attr_set(env, orphan, la, th, BYPASS_CAPA); + rc = dt_attr_set(env, orphan, la, th); GOTO(stop, rc = (rc == 0 ? 1 : rc)); @@ -1084,7 +1072,7 @@ stop: log: lfsck_ibits_unlock(&clh, LCK_EX); - lfsck_ibits_unlock(&plh, LCK_EX); + lfsck_unlock(pllh); CDEBUG(D_LFSCK, "%s: namespace LFSCK insert orphan for the " "object "DFID", name = %s: rc = %d\n", lfsck_lfsck2name(lfsck), PFID(cfid), @@ -1131,21 +1119,34 @@ static int lfsck_namespace_insert_normal(const struct lu_env *env, struct lu_attr *la = &info->lti_la; struct dt_insert_rec *rec = &info->lti_dt_rec; struct lfsck_instance *lfsck = com->lc_lfsck; + /* The child and its name may be on different MDTs. */ + const struct lu_fid *pfid = lfsck_dto2fid(parent); + const struct lu_fid *cfid = lfsck_dto2fid(child); struct dt_device *dev = lfsck->li_next; struct thandle *th = NULL; - struct lustre_handle lh = { 0 }; + struct lfsck_lock_handle *llh = &info->lti_llh; int rc = 0; ENTRY; + /* @parent/@child may be based on lfsck->li_bottom, + * but here we need the object based on the lfsck->li_next. */ + + parent = lfsck_object_locate(dev, parent); + if (IS_ERR(parent)) + GOTO(log, rc = PTR_ERR(parent)); + if (unlikely(!dt_try_as_dir(env, parent))) GOTO(log, rc = -ENOTDIR); + child = lfsck_object_locate(dev, child); + if (IS_ERR(child)) + GOTO(log, rc = PTR_ERR(child)); + if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) GOTO(log, rc = 1); - /* Hold update lock on the parent to prevent others to access. */ - rc = lfsck_ibits_lock(env, lfsck, parent, &lh, - MDS_INODELOCK_UPDATE, LCK_EX); + rc = lfsck_lock(env, lfsck, parent, name, llh, + MDS_INODELOCK_UPDATE, LCK_PW); if (rc != 0) GOTO(log, rc); @@ -1154,7 +1155,7 @@ static int lfsck_namespace_insert_normal(const struct lu_env *env, GOTO(unlock, rc = PTR_ERR(th)); rec->rec_type = lfsck_object_type(child) & S_IFMT; - rec->rec_fid = lfsck_dto2fid(child); + rec->rec_fid = cfid; rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec, (const struct dt_key *)name, th); if (rc != 0) @@ -1182,7 +1183,7 @@ static int lfsck_namespace_insert_normal(const struct lu_env *env, GOTO(stop, rc); rc = dt_insert(env, parent, (const struct dt_rec *)rec, - (const struct dt_key *)name, th, BYPASS_CAPA, 1); + (const struct dt_key *)name, th, 1); if (rc != 0) GOTO(stop, rc); @@ -1195,11 +1196,11 @@ static int lfsck_namespace_insert_normal(const struct lu_env *env, } la->la_ctime = cfs_time_current_sec(); - rc = dt_attr_set(env, parent, la, th, BYPASS_CAPA); + rc = dt_attr_set(env, parent, la, th); if (rc != 0) GOTO(stop, rc); - rc = dt_attr_set(env, child, la, th, BYPASS_CAPA); + rc = dt_attr_set(env, child, la, th); GOTO(stop, rc = (rc == 0 ? 1 : rc)); @@ -1207,14 +1208,13 @@ stop: dt_trans_stop(env, dev, th); unlock: - lfsck_ibits_unlock(&lh, LCK_EX); + lfsck_unlock(llh); log: CDEBUG(D_LFSCK, "%s: namespace LFSCK insert object "DFID" with " "the name %s and type %o to the parent "DFID": rc = %d\n", - lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), name, - lfsck_object_type(child) & S_IFMT, - PFID(lfsck_dto2fid(parent)), rc); + lfsck_lfsck2name(lfsck), PFID(cfid), name, + lfsck_object_type(child) & S_IFMT, PFID(pfid), rc); if (rc != 0) { struct lfsck_namespace *ns = com->lc_file_ram; @@ -1259,11 +1259,10 @@ static int lfsck_namespace_create_orphan_dir(const struct lu_env *env, struct lu_fid tfid; struct lfsck_instance *lfsck = com->lc_lfsck; struct lfsck_namespace *ns = com->lc_file_ram; - struct dt_device *dev; + struct dt_device *dev = lfsck_obj2dev(orphan); struct dt_object *parent = NULL; - struct dt_object *child = NULL; struct thandle *th = NULL; - struct lustre_handle lh = { 0 }; + struct lfsck_lock_handle *llh = &info->lti_llh; struct linkea_data ldata = { NULL }; struct lu_buf linkea_buf; struct lu_buf lmv_buf; @@ -1271,6 +1270,7 @@ static int lfsck_namespace_create_orphan_dir(const struct lu_env *env, int namelen; int idx = 0; int rc = 0; + int rc1 = 0; ENTRY; LASSERT(!dt_object_exists(orphan)); @@ -1289,7 +1289,7 @@ static int lfsck_namespace_create_orphan_dir(const struct lu_env *env, snprintf(name, 8, "MDT%04x", idx); rc = dt_lookup(env, lfsck->li_lpf_root_obj, (struct dt_rec *)&tfid, - (const struct dt_key *)name, BYPASS_CAPA); + (const struct dt_key *)name); if (rc != 0) GOTO(log, rc = (rc == -ENOENT ? -ENXIO : rc)); @@ -1310,26 +1310,35 @@ static int lfsck_namespace_create_orphan_dir(const struct lu_env *env, if (IS_ERR(dev)) GOTO(log, rc = PTR_ERR(dev)); - child = lfsck_object_find_by_dev(env, dev, cfid); - if (IS_ERR(child)) - GOTO(log, rc = PTR_ERR(child)); - - /* Hold update lock on the parent to prevent others to access. */ - rc = lfsck_ibits_lock(env, lfsck, parent, &lh, - MDS_INODELOCK_UPDATE, LCK_EX); - if (rc != 0) - GOTO(log, rc); - idx = 0; + +again: do { namelen = snprintf(name, 31, DFID"-P-%d", PFID(cfid), idx++); rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, - (const struct dt_key *)name, BYPASS_CAPA); + (const struct dt_key *)name); if (rc != 0 && rc != -ENOENT) - GOTO(unlock1, rc); + GOTO(log, rc); } while (rc == 0); + rc = lfsck_lock(env, lfsck, parent, name, llh, + MDS_INODELOCK_UPDATE, LCK_PW); + if (rc != 0) + GOTO(log, rc); + + /* Re-check whether the name conflict with othrs after taken + * the ldlm lock. */ + rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, + (const struct dt_key *)name); + if (unlikely(rc == 0)) { + lfsck_unlock(llh); + goto again; + } + + if (rc != -ENOENT) + GOTO(unlock1, rc); + cname->ln_name = name; cname->ln_namelen = namelen; @@ -1338,17 +1347,14 @@ static int lfsck_namespace_create_orphan_dir(const struct lu_env *env, la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID | LA_ATIME | LA_MTIME | LA_CTIME; - child->do_ops->do_ah_init(env, hint, parent, child, - la->la_mode & S_IFMT); + orphan->do_ops->do_ah_init(env, hint, parent, orphan, + la->la_mode & S_IFMT); memset(dof, 0, sizeof(*dof)); dof->dof_type = dt_mode_to_dft(S_IFDIR); - rc = linkea_data_new(&ldata, &info->lti_linkea_buf2); - if (rc != 0) - GOTO(unlock1, rc); - - rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent)); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf2, + cname, lfsck_dto2fid(parent)); if (rc != 0) GOTO(unlock1, rc); @@ -1361,39 +1367,36 @@ static int lfsck_namespace_create_orphan_dir(const struct lu_env *env, if (dt_object_remote(orphan)) th->th_sync = 1; - rc = dt_declare_create(env, child, la, hint, dof, th); + rc = dt_declare_create(env, orphan, la, hint, dof, th); if (rc != 0) GOTO(stop, rc); - if (unlikely(!dt_try_as_dir(env, child))) + if (unlikely(!dt_try_as_dir(env, orphan))) GOTO(stop, rc = -ENOTDIR); + rc = dt_declare_ref_add(env, orphan, th); + if (rc != 0) + GOTO(stop, rc); + rec->rec_type = S_IFDIR; rec->rec_fid = cfid; - rc = dt_declare_insert(env, child, (const struct dt_rec *)rec, + rc = dt_declare_insert(env, orphan, (const struct dt_rec *)rec, (const struct dt_key *)dot, th); if (rc != 0) GOTO(stop, rc); rec->rec_fid = lfsck_dto2fid(parent); - rc = dt_declare_insert(env, child, (const struct dt_rec *)rec, + rc = dt_declare_insert(env, orphan, (const struct dt_rec *)rec, (const struct dt_key *)dotdot, th); - if (rc == 0) - rc = dt_declare_ref_add(env, child, th); - - if (rc != 0) - GOTO(stop, rc); - - rc = dt_declare_ref_add(env, child, th); if (rc != 0) GOTO(stop, rc); if (lmv != NULL) { lmv->lmv_magic = LMV_MAGIC; - lmv->lmv_master_mdt_index = lfsck_dev_idx(dev); + lmv->lmv_master_mdt_index = lfsck_dev_idx(lfsck); lfsck_lmv_header_cpu_to_le(lmv2, lmv); lfsck_buf_init(&lmv_buf, lmv2, sizeof(*lmv2)); - rc = dt_declare_xattr_set(env, child, &lmv_buf, + rc = dt_declare_xattr_set(env, orphan, &lmv_buf, XATTR_NAME_LMV, 0, th); if (rc != 0) GOTO(stop, rc); @@ -1401,7 +1404,7 @@ static int lfsck_namespace_create_orphan_dir(const struct lu_env *env, lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf, ldata.ld_leh->leh_len); - rc = dt_declare_xattr_set(env, child, &linkea_buf, + rc = dt_declare_xattr_set(env, orphan, &linkea_buf, XATTR_NAME_LINK, 0, th); if (rc != 0) GOTO(stop, rc); @@ -1419,44 +1422,42 @@ static int lfsck_namespace_create_orphan_dir(const struct lu_env *env, if (rc != 0) GOTO(stop, rc); - dt_write_lock(env, child, 0); - rc = dt_create(env, child, la, hint, dof, th); + dt_write_lock(env, orphan, 0); + rc = dt_create(env, orphan, la, hint, dof, th); if (rc != 0) GOTO(unlock2, rc); - rec->rec_fid = cfid; - rc = dt_insert(env, child, (const struct dt_rec *)rec, - (const struct dt_key *)dot, th, BYPASS_CAPA, 1); + rc = dt_ref_add(env, orphan, th); if (rc != 0) GOTO(unlock2, rc); - rec->rec_fid = lfsck_dto2fid(parent); - rc = dt_insert(env, child, (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th, - BYPASS_CAPA, 1); + rec->rec_fid = cfid; + rc = dt_insert(env, orphan, (const struct dt_rec *)rec, + (const struct dt_key *)dot, th, 1); if (rc != 0) GOTO(unlock2, rc); - rc = dt_ref_add(env, child, th); + rec->rec_fid = lfsck_dto2fid(parent); + rc = dt_insert(env, orphan, (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, th, 1); if (rc != 0) GOTO(unlock2, rc); if (lmv != NULL) { - rc = dt_xattr_set(env, child, &lmv_buf, XATTR_NAME_LMV, 0, - th, BYPASS_CAPA); + rc = dt_xattr_set(env, orphan, &lmv_buf, XATTR_NAME_LMV, 0, th); if (rc != 0) GOTO(unlock2, rc); } - rc = dt_xattr_set(env, child, &linkea_buf, - XATTR_NAME_LINK, 0, th, BYPASS_CAPA); - dt_write_unlock(env, child); + rc = dt_xattr_set(env, orphan, &linkea_buf, + XATTR_NAME_LINK, 0, th); + dt_write_unlock(env, orphan); if (rc != 0) GOTO(stop, rc); rec->rec_fid = cfid; rc = dt_insert(env, parent, (const struct dt_rec *)rec, - (const struct dt_key *)name, th, BYPASS_CAPA, 1); + (const struct dt_key *)name, th, 1); if (rc == 0) { dt_write_lock(env, parent, 0); rc = dt_ref_add(env, parent, th); @@ -1466,13 +1467,15 @@ static int lfsck_namespace_create_orphan_dir(const struct lu_env *env, GOTO(stop, rc = (rc == 0 ? 1 : rc)); unlock2: - dt_write_unlock(env, child); + dt_write_unlock(env, orphan); stop: - dt_trans_stop(env, dev, th); + rc1 = dt_trans_stop(env, dev, th); + if (rc1 != 0 && rc > 0) + rc = rc1; unlock1: - lfsck_ibits_unlock(&lh, LCK_EX); + lfsck_unlock(llh); log: CDEBUG(D_LFSCK, "%s: namespace LFSCK create orphan dir for " @@ -1480,9 +1483,6 @@ log: lfsck_lfsck2name(lfsck), PFID(cfid), cname->ln_name != NULL ? cname->ln_name : "", rc); - if (child != NULL && !IS_ERR(child)) - lfsck_object_put(env, child); - if (parent != NULL && !IS_ERR(parent) && parent != lfsck->li_lpf_obj) lfsck_object_put(env, parent); @@ -1521,36 +1521,44 @@ static int lfsck_namespace_shrink_linkea(const struct lu_env *env, bool next) { struct lfsck_instance *lfsck = com->lc_lfsck; - struct dt_device *dev = lfsck->li_bottom; + struct dt_device *dev = lfsck_obj2dev(obj); struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; struct thandle *th = NULL; struct lustre_handle lh = { 0 }; struct linkea_data ldata_new = { NULL }; struct lu_buf linkea_buf; + int buflen = 0; int rc = 0; ENTRY; rc = lfsck_ibits_lock(env, lfsck, obj, &lh, - MDS_INODELOCK_UPDATE | - MDS_INODELOCK_XATTR, LCK_EX); + MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR, + LCK_EX); if (rc != 0) GOTO(log, rc); if (next) - linkea_del_buf(ldata, cname); + lfsck_linkea_del_buf(ldata, cname); else lfsck_namespace_filter_linkea_entry(ldata, cname, pfid, true); - lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf, - ldata->ld_leh->leh_len); + if (ldata->ld_leh->leh_reccount > 0 || + unlikely(ldata->ld_leh->leh_overflow_time)) { + lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf, + ldata->ld_leh->leh_len); + buflen = linkea_buf.lb_len; + } again: th = dt_trans_create(env, dev); if (IS_ERR(th)) GOTO(unlock1, rc = PTR_ERR(th)); - rc = dt_declare_xattr_set(env, obj, &linkea_buf, - XATTR_NAME_LINK, 0, th); + if (buflen != 0) + rc = dt_declare_xattr_set(env, obj, &linkea_buf, + XATTR_NAME_LINK, 0, th); + else + rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, th); if (rc != 0) GOTO(stop, rc); @@ -1562,9 +1570,9 @@ again: if (unlikely(lfsck_is_dead_obj(obj))) GOTO(unlock2, rc = -ENOENT); - rc = lfsck_links_read2(env, obj, &ldata_new); - if (rc != 0) - GOTO(unlock2, rc); + rc = lfsck_links_read2_with_rec(env, obj, &ldata_new); + if (rc) + GOTO(unlock2, rc = (rc == -ENODATA ? 0 : rc)); /* The specified linkEA entry has been removed by race. */ rc = linkea_links_find(&ldata_new, cname, pfid); @@ -1575,12 +1583,12 @@ again: GOTO(unlock2, rc = 1); if (next) - linkea_del_buf(&ldata_new, cname); + lfsck_linkea_del_buf(&ldata_new, cname); else lfsck_namespace_filter_linkea_entry(&ldata_new, cname, pfid, true); - if (linkea_buf.lb_len < ldata_new.ld_leh->leh_len) { + if (buflen < ldata_new.ld_leh->leh_len) { dt_write_unlock(env, obj); dt_trans_stop(env, dev, th); lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf, @@ -1588,10 +1596,11 @@ again: goto again; } - lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf, - ldata_new.ld_leh->leh_len); - rc = dt_xattr_set(env, obj, &linkea_buf, - XATTR_NAME_LINK, 0, th, BYPASS_CAPA); + if (ldata_new.ld_leh->leh_reccount > 0 || + unlikely(ldata->ld_leh->leh_overflow_time)) + rc = lfsck_links_write(env, obj, &ldata_new, th); + else + rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th); GOTO(unlock2, rc = (rc == 0 ? 1 : rc)); @@ -1647,20 +1656,21 @@ static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env, struct lu_name *cname, struct lu_fid *pfid) { - struct lu_fid *cfid = &lfsck_env_info(env)->lti_fid3; - struct lustre_handle lh = { 0 }; - int rc; + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lu_fid *cfid = &info->lti_fid3; + struct lfsck_lock_handle *llh = &info->lti_llh; + int rc; ENTRY; - rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh, - MDS_INODELOCK_UPDATE, LCK_EX); + rc = lfsck_lock(env, com->lc_lfsck, parent, cname->ln_name, llh, + MDS_INODELOCK_UPDATE, LCK_PR); if (rc != 0) RETURN(rc); dt_read_lock(env, parent, 0); if (unlikely(lfsck_is_dead_obj(parent))) { dt_read_unlock(env, parent); - lfsck_ibits_unlock(&lh, LCK_EX); + lfsck_unlock(llh); rc = lfsck_namespace_shrink_linkea(env, com, child, ldata, cname, pfid, true); @@ -1668,8 +1678,7 @@ static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env, } rc = dt_lookup(env, parent, (struct dt_rec *)cfid, - (const struct dt_key *)cname->ln_name, - BYPASS_CAPA); + (const struct dt_key *)cname->ln_name); dt_read_unlock(env, parent); /* It is safe to release the ldlm lock, because when the logic come @@ -1679,7 +1688,7 @@ static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env, * has removed the specified linkEA entry by race, then it is OK, * because the subsequent lfsck_namespace_shrink_linkea() can handle * such case. */ - lfsck_ibits_unlock(&lh, LCK_EX); + lfsck_unlock(llh); if (rc == -ENOENT) { rc = lfsck_namespace_shrink_linkea(env, com, child, ldata, cname, pfid, true); @@ -1738,10 +1747,12 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env, struct dt_insert_rec *rec = &info->lti_dt_rec; struct lu_fid tfid; struct lfsck_instance *lfsck = com->lc_lfsck; + /* The child and its name may be on different MDTs. */ struct dt_device *dev = lfsck->li_next; const char *name = cname->ln_name; - struct dt_object *obj = NULL; - struct lustre_handle plh = { 0 }; + const struct lu_fid *pfid = lfsck_dto2fid(parent); + struct dt_object *cobj = NULL; + struct lfsck_lock_handle *pllh = &info->lti_llh; struct lustre_handle clh = { 0 }; struct linkea_data ldata = { NULL }; struct thandle *th = NULL; @@ -1749,8 +1760,18 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env, int rc = 0; ENTRY; - rc = lfsck_ibits_lock(env, lfsck, parent, &plh, - MDS_INODELOCK_UPDATE, LCK_EX); + /* @parent/@child may be based on lfsck->li_bottom, + * but here we need the object based on the lfsck->li_next. */ + + parent = lfsck_object_locate(dev, parent); + if (IS_ERR(parent)) + GOTO(log, rc = PTR_ERR(parent)); + + if (unlikely(!dt_try_as_dir(env, parent))) + GOTO(log, rc = -ENOTDIR); + + rc = lfsck_lock(env, lfsck, parent, name, pllh, + MDS_INODELOCK_UPDATE, LCK_PW); if (rc != 0) GOTO(log, rc); @@ -1759,9 +1780,9 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env, goto replace; } - obj = lfsck_object_find(env, lfsck, cfid); - if (IS_ERR(obj)) { - rc = PTR_ERR(obj); + cobj = lfsck_object_find_by_dev(env, dev, cfid); + if (IS_ERR(cobj)) { + rc = PTR_ERR(cobj); if (rc == -ENOENT) { exist = false; goto replace; @@ -1770,13 +1791,13 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env, GOTO(log, rc); } - if (!dt_object_exists(obj)) { + if (!dt_object_exists(cobj)) { exist = false; goto replace; } rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, - (const struct dt_key *)name, BYPASS_CAPA); + (const struct dt_key *)name); if (rc == -ENOENT) { exist = false; goto replace; @@ -1790,18 +1811,19 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env, GOTO(log, rc = 0); /* lock the object to be destroyed. */ - rc = lfsck_ibits_lock(env, lfsck, obj, &clh, + rc = lfsck_ibits_lock(env, lfsck, cobj, &clh, MDS_INODELOCK_UPDATE | - MDS_INODELOCK_XATTR, LCK_EX); + MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR, + LCK_EX); if (rc != 0) GOTO(log, rc); - if (unlikely(lfsck_is_dead_obj(obj))) { + if (unlikely(lfsck_is_dead_obj(cobj))) { exist = false; goto replace; } - rc = dt_attr_get(env, obj, la, BYPASS_CAPA); + rc = dt_attr_get(env, cobj, la); if (rc != 0) GOTO(log, rc); @@ -1811,8 +1833,7 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env, GOTO(log, rc); if (S_ISREG(la->la_mode)) { - rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV, - BYPASS_CAPA); + rc = dt_xattr_get(env, cobj, &LU_BUF_NULL, XATTR_NAME_LOV); /* If someone has created related OST-object(s), * then keep it. */ if ((rc > 0) || (rc < 0 && rc != -ENODATA)) @@ -1821,7 +1842,7 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env, replace: dt_read_lock(env, child, 0); - rc = lfsck_links_read2(env, child, &ldata); + rc = lfsck_links_read2_with_rec(env, child, &ldata); dt_read_unlock(env, child); /* Someone changed the child, no need to replace. */ @@ -1831,7 +1852,7 @@ replace: if (rc != 0) GOTO(log, rc); - rc = linkea_links_find(&ldata, cname, lfsck_dto2fid(parent)); + rc = linkea_links_find(&ldata, cname, pfid); /* Someone moved the child, no need to replace. */ if (rc != 0) GOTO(log, rc = 0); @@ -1844,7 +1865,7 @@ replace: GOTO(log, rc = PTR_ERR(th)); if (exist) { - rc = dt_declare_destroy(env, obj, th); + rc = dt_declare_destroy(env, cobj, th); if (rc != 0) GOTO(stop, rc); } @@ -1860,22 +1881,23 @@ replace: if (rc != 0) GOTO(stop, rc); - rc = dt_trans_start(env, dev, th); + rc = dt_trans_start_local(env, dev, th); if (rc != 0) GOTO(stop, rc); if (exist) { - rc = dt_destroy(env, obj, th); + rc = dt_destroy(env, cobj, th); if (rc != 0) GOTO(stop, rc); } /* The old name entry maybe not exist. */ - dt_delete(env, parent, (const struct dt_key *)name, th, - BYPASS_CAPA); + rc = dt_delete(env, parent, (const struct dt_key *)name, th); + if (rc != 0 && rc != -ENOENT) + GOTO(stop, rc); rc = dt_insert(env, parent, (const struct dt_rec *)rec, - (const struct dt_key *)name, th, BYPASS_CAPA, 1); + (const struct dt_key *)name, th, 1); GOTO(stop, rc = (rc == 0 ? 1 : rc)); @@ -1884,16 +1906,16 @@ stop: log: lfsck_ibits_unlock(&clh, LCK_EX); - lfsck_ibits_unlock(&plh, LCK_EX); - if (obj != NULL && !IS_ERR(obj)) - lfsck_object_put(env, obj); + lfsck_unlock(pllh); + + if (cobj != NULL && !IS_ERR(cobj)) + lfsck_object_put(env, cobj); CDEBUG(D_LFSCK, "%s: namespace LFSCK conditionally destroy the " "object "DFID" because of conflict with the object "DFID " under the parent "DFID" with name %s: rc = %d\n", lfsck_lfsck2name(lfsck), PFID(cfid), - PFID(lfsck_dto2fid(child)), PFID(lfsck_dto2fid(parent)), - name, rc); + PFID(lfsck_dto2fid(child)), PFID(pfid), name, rc); return rc; } @@ -1918,7 +1940,7 @@ int lfsck_namespace_rebuild_linkea(const struct lu_env *env, struct linkea_data *ldata) { struct lfsck_instance *lfsck = com->lc_lfsck; - struct dt_device *dev = lfsck->li_bottom; + struct dt_device *dev = lfsck_obj2dev(obj); struct thandle *th = NULL; struct lu_buf linkea_buf; int rc = 0; @@ -1947,7 +1969,7 @@ int lfsck_namespace_rebuild_linkea(const struct lu_env *env, GOTO(unlock, rc = 1); rc = dt_xattr_set(env, obj, &linkea_buf, - XATTR_NAME_LINK, 0, th, BYPASS_CAPA); + XATTR_NAME_LINK, 0, th); GOTO(unlock, rc = (rc == 0 ? 1 : rc)); @@ -1999,21 +2021,32 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, const char *name, const char *name2, __u16 type, bool update, bool dec) { - struct dt_insert_rec *rec = &lfsck_env_info(env)->lti_dt_rec; - const struct lu_fid *cfid = lfsck_dto2fid(child); - struct lu_fid tfid; - struct lfsck_instance *lfsck = com->lc_lfsck; - struct dt_device *dev = lfsck->li_next; - struct thandle *th = NULL; - struct lustre_handle lh = { 0 }; - int rc = 0; + struct lfsck_thread_info *info = lfsck_env_info(env); + struct dt_insert_rec *rec = &info->lti_dt_rec; + const struct lu_fid *pfid = lfsck_dto2fid(parent); + const struct lu_fid *cfid = lfsck_dto2fid(child); + struct lu_fid tfid; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct dt_device *dev = lfsck->li_next; + struct thandle *th = NULL; + struct lfsck_lock_handle *llh = &info->lti_llh; + struct lustre_handle lh = { 0 }; + int rc = 0; ENTRY; + parent = lfsck_object_locate(dev, parent); + if (IS_ERR(parent)) + GOTO(log, rc = PTR_ERR(parent)); + if (unlikely(!dt_try_as_dir(env, parent))) GOTO(log, rc = -ENOTDIR); - rc = lfsck_ibits_lock(env, lfsck, parent, &lh, - MDS_INODELOCK_UPDATE, LCK_EX); + if (!update || strcmp(name, name2) == 0) + rc = lfsck_lock(env, lfsck, parent, name, llh, + MDS_INODELOCK_UPDATE, LCK_PW); + else + rc = lfsck_ibits_lock(env, lfsck, parent, &lh, + MDS_INODELOCK_UPDATE, LCK_PW); if (rc != 0) GOTO(log, rc); @@ -2035,19 +2068,20 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, GOTO(stop, rc); } - if (dec) { + if (dec && S_ISDIR(type)) { rc = dt_declare_ref_del(env, parent, th); if (rc != 0) GOTO(stop, rc); } - rc = dt_trans_start(env, dev, th); + rc = dt_trans_start_local(env, dev, th); if (rc != 0) GOTO(stop, rc); + dt_write_lock(env, parent, 0); rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, - (const struct dt_key *)name, BYPASS_CAPA); + (const struct dt_key *)name); /* Someone has removed the bad name entry by race. */ if (rc == -ENOENT) GOTO(unlock2, rc = 0); @@ -2063,21 +2097,19 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) GOTO(unlock2, rc = 1); - rc = dt_delete(env, parent, (const struct dt_key *)name, th, - BYPASS_CAPA); + rc = dt_delete(env, parent, (const struct dt_key *)name, th); if (rc != 0) GOTO(unlock2, rc); if (update) { rc = dt_insert(env, parent, (const struct dt_rec *)rec, - (const struct dt_key *)name2, th, - BYPASS_CAPA, 1); + (const struct dt_key *)name2, th, 1); if (rc != 0) GOTO(unlock2, rc); } - if (dec) { + if (dec && S_ISDIR(type)) { rc = dt_ref_del(env, parent, th); if (rc != 0) GOTO(unlock2, rc); @@ -2099,14 +2131,16 @@ stop: LNTF_CHECK_LINKEA, true); unlock1: - lfsck_ibits_unlock(&lh, LCK_EX); + /* It is harmless even if unlock the unused lock_handle */ + lfsck_ibits_unlock(&lh, LCK_PW); + lfsck_unlock(llh); log: CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found bad name " "entry for: parent "DFID", child "DFID", name %s, type " "in name entry %o, type claimed by child %o. repair it " - "by %s with new name2 %s: rc = %d\n", lfsck_lfsck2name(lfsck), - PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)), + "by %s with new name2 %s: rc = %d\n", + lfsck_lfsck2name(lfsck), PFID(pfid), PFID(cfid), name, type, update ? lfsck_object_type(child) : 0, update ? "updating" : "removing", name2, rc); @@ -2146,7 +2180,7 @@ static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env, struct lfsck_thread_info *info = lfsck_env_info(env); struct dt_insert_rec *rec = &info->lti_dt_rec; struct lfsck_instance *lfsck = com->lc_lfsck; - struct dt_device *dev = lfsck->li_bottom; + struct dt_device *dev = lfsck_obj2dev(obj); struct thandle *th = NULL; struct linkea_data ldata = { NULL }; struct lu_buf linkea_buf; @@ -2156,11 +2190,7 @@ static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env, LASSERT(!dt_object_remote(obj)); LASSERT(S_ISDIR(lfsck_object_type(obj))); - rc = linkea_data_new(&ldata, &info->lti_big_buf); - if (rc != 0) - GOTO(log, rc); - - rc = linkea_add_buf(&ldata, cname, pfid); + rc = linkea_links_new(&ldata, &info->lti_big_buf, cname, pfid); if (rc != 0) GOTO(log, rc); @@ -2199,16 +2229,14 @@ static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env, GOTO(unlock, rc = 1); /* The old ".." name entry maybe not exist. */ - dt_delete(env, obj, (const struct dt_key *)dotdot, th, - BYPASS_CAPA); + dt_delete(env, obj, (const struct dt_key *)dotdot, th); rc = dt_insert(env, obj, (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1); + (const struct dt_key *)dotdot, th, 1); if (rc != 0) GOTO(unlock, rc); - rc = dt_xattr_set(env, obj, &linkea_buf, - XATTR_NAME_LINK, 0, th, BYPASS_CAPA); + rc = lfsck_links_write(env, obj, &ldata, th); GOTO(unlock, rc = (rc == 0 ? 1 : rc)); @@ -2313,6 +2341,7 @@ lfsck_namespace_dsd_orphan(const struct lu_env *env, * \param[out] type to tell the caller what the inconsistency is * \param[in] retry if found inconsistency, but the caller does not hold * ldlm lock on the @child, then set @retry as true + * \param[in] unknown set if does not know how to repair the inconsistency * * \retval positive number for repaired cases * \retval 0 if nothing to be repaired @@ -2326,7 +2355,7 @@ lfsck_namespace_dsd_single(const struct lu_env *env, struct linkea_data *ldata, struct lustre_handle *lh, enum lfsck_namespace_inconsistency_type *type, - bool *retry) + bool *retry, bool *unknown) { struct lfsck_thread_info *info = lfsck_env_info(env); struct lu_name *cname = &info->lti_name; @@ -2339,9 +2368,11 @@ lfsck_namespace_dsd_single(const struct lu_env *env, int rc = 0; ENTRY; - lfsck_namespace_unpack_linkea_entry(ldata, cname, &tfid, info->lti_key); + rc = lfsck_namespace_unpack_linkea_entry(ldata, cname, &tfid, + info->lti_key, + sizeof(info->lti_key)); /* The unique linkEA entry with bad parent will be handled as orphan. */ - if (!fid_is_sane(&tfid)) { + if (rc != 0) { if (!lustre_handle_is_used(lh) && retry != NULL) *retry = true; else @@ -2435,7 +2466,7 @@ lost_parent: } GOTO(out, rc); - } + } /* !dt_object_exists(parent) */ /* The unique linkEA entry with bad parent will be handled as orphan. */ if (unlikely(!dt_try_as_dir(env, parent))) { @@ -2449,7 +2480,7 @@ lost_parent: } rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, - (const struct dt_key *)cname->ln_name, BYPASS_CAPA); + (const struct dt_key *)cname->ln_name); if (rc == -ENOENT) { /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT * has ever tried to verify some remote MDT-object that resides @@ -2521,7 +2552,7 @@ lost_parent: } GOTO(out, rc); - } + } /* rc == -ENOENT */ if (rc != 0) GOTO(out, rc); @@ -2546,8 +2577,18 @@ lost_parent: GOTO(out, rc); } - if (fid_is_zero(pfid)) + /* Zero FID may because the remote directroy object has invalid linkEA, + * or lost linkEA. Under such case, the LFSCK on this MDT does not know + * how to repair the inconsistency, but the namespace LFSCK on the MDT + * where its name entry resides may has more information (name, FID) to + * repair such inconsistency. So here, keep the inconsistency to avoid + * some imporper repairing. */ + if (fid_is_zero(pfid)) { + if (unknown) + *unknown = true; + GOTO(out, rc = 0); + } /* The ".." name entry is wrong, update it. */ if (!lu_fid_eq(pfid, lfsck_dto2fid(parent))) { @@ -2589,6 +2630,7 @@ out: * \param[in,out] lh ldlm lock handler for the given @child * \param[out] type to tell the caller what the inconsistency is * \param[in] lpf true if the ".." entry is under lost+found/MDTxxxx/ + * \param[in] unknown set if does not know how to repair the inconsistency * * \retval positive number for repaired cases * \retval 0 if nothing to be repaired @@ -2602,7 +2644,7 @@ lfsck_namespace_dsd_multiple(const struct lu_env *env, struct linkea_data *ldata, struct lustre_handle *lh, enum lfsck_namespace_inconsistency_type *type, - bool lpf) + bool lpf, bool *unknown) { struct lfsck_thread_info *info = lfsck_env_info(env); struct lu_name *cname = &info->lti_name; @@ -2615,24 +2657,24 @@ lfsck_namespace_dsd_multiple(const struct lu_env *env, struct dt_object *parent = NULL; struct linkea_data ldata_new = { NULL }; int dirent_count = 0; - int linkea_count = 0; int rc = 0; bool once = true; ENTRY; again: while (ldata->ld_lee != NULL) { - lfsck_namespace_unpack_linkea_entry(ldata, cname, &tfid, - info->lti_key); - /* Drop repeated linkEA entries. */ - lfsck_namespace_filter_linkea_entry(ldata, cname, &tfid, true); + rc = lfsck_namespace_unpack_linkea_entry(ldata, cname, &tfid, + info->lti_key, + sizeof(info->lti_key)); /* Drop invalid linkEA entry. */ - if (!fid_is_sane(&tfid)) { - linkea_del_buf(ldata, cname); - linkea_count++; + if (rc != 0) { + lfsck_linkea_del_buf(ldata, cname); continue; } + /* Drop repeated linkEA entries. */ + lfsck_namespace_filter_linkea_entry(ldata, cname, &tfid, true); + /* If current dotdot is the .lustre/lost+found/MDTxxxx/, * then it is possible that: the directry object has ever * been lost, but its name entry was there. In the former @@ -2646,7 +2688,8 @@ again: * When the LFSCK runs again, if the dangling name is still * there, the LFSCK should move the orphan directory object * back to the normal namespace. */ - if (!lpf && !lu_fid_eq(pfid, &tfid) && once) { + if (!lpf && !fid_is_zero(pfid) && + !lu_fid_eq(pfid, &tfid) && once) { linkea_next_entry(ldata); continue; } @@ -2662,8 +2705,7 @@ again: * there is still other chance to make the * child to be visible via other parent, then * remove this linkEA entry. */ - linkea_del_buf(ldata, cname); - linkea_count++; + lfsck_linkea_del_buf(ldata, cname); continue; } @@ -2673,14 +2715,12 @@ again: /* The linkEA entry with bad parent will be removed. */ if (unlikely(!dt_try_as_dir(env, parent))) { lfsck_object_put(env, parent); - linkea_del_buf(ldata, cname); - linkea_count++; + lfsck_linkea_del_buf(ldata, cname); continue; } rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, - (const struct dt_key *)cname->ln_name, - BYPASS_CAPA); + (const struct dt_key *)cname->ln_name); *pfid2 = *lfsck_dto2fid(parent); if (rc == -ENOENT) { lfsck_object_put(env, parent); @@ -2696,7 +2736,11 @@ again: if (lu_fid_eq(&tfid, cfid)) { lfsck_object_put(env, parent); - if (!lu_fid_eq(pfid, pfid2)) { + /* If the parent (that is declared via linkEA entry) + * directory contains the specified child, but such + * parent does not match the dotdot name entry, then + * trust the linkEA. */ + if (!fid_is_zero(pfid) && !lu_fid_eq(pfid, pfid2)) { *type = LNIT_UNMATCHED_PAIRS; rc = lfsck_namespace_repair_unmatched_pairs(env, com, child, pfid2, cname); @@ -2708,11 +2752,8 @@ rebuild: /* It is the most common case that we find the * name entry corresponding to the linkEA entry * that matches the ".." name entry. */ - rc = linkea_data_new(&ldata_new, &info->lti_big_buf); - if (rc != 0) - RETURN(rc); - - rc = linkea_add_buf(&ldata_new, cname, pfid2); + rc = linkea_links_new(&ldata_new, &info->lti_big_buf, + cname, pfid2); if (rc != 0) RETURN(rc); @@ -2721,15 +2762,15 @@ rebuild: if (rc < 0) RETURN(rc); - linkea_del_buf(ldata, cname); - linkea_count++; + lfsck_linkea_del_buf(ldata, cname); linkea_first_entry(ldata); /* There may be some invalid dangling name entries under * other parent directories, remove all of them. */ while (ldata->ld_lee != NULL) { - lfsck_namespace_unpack_linkea_entry(ldata, - cname, &tfid, info->lti_key); - if (!fid_is_sane(&tfid)) + rc = lfsck_namespace_unpack_linkea_entry(ldata, + cname, &tfid, info->lti_key, + sizeof(info->lti_key)); + if (rc != 0) goto next; parent = lfsck_object_find_bottom(env, lfsck, @@ -2762,13 +2803,13 @@ rebuild: dirent_count += rc; next: - linkea_del_buf(ldata, cname); + lfsck_linkea_del_buf(ldata, cname); } ns->ln_dirent_repaired += dirent_count; RETURN(rc); - } + } /* lu_fid_eq(&tfid, lfsck_dto2fid(child)) */ lfsck_ibits_unlock(lh, LCK_EX); /* The name entry references another MDT-object that may be @@ -2783,17 +2824,17 @@ next: if (rc > 0) goto rebuild; - linkea_del_buf(ldata, cname); - } + lfsck_linkea_del_buf(ldata, cname); + } /* while (ldata->ld_lee != NULL) */ + + /* If there is still linkEA overflow, return. */ + if (unlikely(ldata->ld_leh->leh_overflow_time)) + RETURN(0); linkea_first_entry(ldata); if (ldata->ld_leh->leh_reccount == 1) { rc = lfsck_namespace_dsd_single(env, com, child, pfid, ldata, - lh, type, NULL); - - if (rc == 0 && fid_is_zero(pfid) && linkea_count > 0) - rc = lfsck_namespace_rebuild_linkea(env, com, child, - ldata); + lh, type, NULL, unknown); RETURN(rc); } @@ -2831,7 +2872,7 @@ next: * * If all the known name entries have been verified, then the object's hard * link attribute should match the object's linkEA entries count unless the - * object's has too much hard link to be recorded in the linkEA. Such cases + * object's has too many hard link to be recorded in the linkEA. Such cases * should have been marked in the LFSCK trace file. Otherwise, trust the * linkEA to update the object's nlink attribute. * @@ -2850,32 +2891,21 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env, struct dt_object *obj, struct lu_attr *la) { - struct lfsck_thread_info *info = lfsck_env_info(env); - struct lu_fid *tfid = &info->lti_fid3; struct lfsck_namespace *ns = com->lc_file_ram; struct lfsck_instance *lfsck = com->lc_lfsck; - struct dt_device *dev = lfsck->li_bottom; + struct dt_device *dev = lfsck_obj2dev(obj); const struct lu_fid *cfid = lfsck_dto2fid(obj); - struct dt_object *child = NULL; struct thandle *th = NULL; struct linkea_data ldata = { NULL }; struct lustre_handle lh = { 0 }; __u32 old = la->la_nlink; - int idx; int rc = 0; - __u8 flags; ENTRY; LASSERT(!dt_object_remote(obj)); - LASSERT(S_ISREG(lfsck_object_type(obj))); - - child = lfsck_object_find_by_dev(env, dev, cfid); - if (IS_ERR(child)) - GOTO(log, rc = PTR_ERR(child)); - rc = lfsck_ibits_lock(env, lfsck, child, &lh, - MDS_INODELOCK_UPDATE | - MDS_INODELOCK_XATTR, LCK_EX); + rc = lfsck_ibits_lock(env, lfsck, obj, &lh, + MDS_INODELOCK_UPDATE, LCK_PW); if (rc != 0) GOTO(log, rc); @@ -2884,7 +2914,7 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env, GOTO(log, rc = PTR_ERR(th)); la->la_valid = LA_NLINK; - rc = dt_declare_attr_set(env, child, la, th); + rc = dt_declare_attr_set(env, obj, la, th); if (rc != 0) GOTO(stop, rc); @@ -2892,7 +2922,7 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env, if (rc != 0) GOTO(stop, rc); - dt_write_lock(env, child, 0); + dt_write_lock(env, obj, 0); /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has * ever tried to verify some remote MDT-object that resides on this * MDT, but this MDT failed to respond such request. So means there @@ -2903,48 +2933,38 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env, if (ns->ln_flags & LF_INCOMPLETE) GOTO(unlock, rc = 0); - fid_cpu_to_be(tfid, cfid); - idx = lfsck_sub_trace_file_fid2idx(cfid); - rc = dt_lookup(env, com->lc_sub_trace_objs[idx].lsto_obj, - (struct dt_rec *)&flags, (const struct dt_key *)tfid, - BYPASS_CAPA); - if (rc != 0) - GOTO(unlock, rc); - - if (flags & LNTF_SKIP_NLINK) - GOTO(unlock, rc = 0); - - rc = dt_attr_get(env, child, la, BYPASS_CAPA); + rc = dt_attr_get(env, obj, la); if (rc != 0) GOTO(unlock, rc = (rc == -ENOENT ? 0 : rc)); - rc = lfsck_links_read2(env, child, &ldata); - if (rc != 0) + rc = lfsck_links_read2_with_rec(env, obj, &ldata); + if (rc) GOTO(unlock, rc = (rc == -ENODATA ? 0 : rc)); - if (la->la_nlink == ldata.ld_leh->leh_reccount || - unlikely(la->la_nlink == 0)) + /* XXX: Currently, we only update the nlink attribute if the known + * linkEA entries is larger than the nlink attribute. That is + * safe action. */ + if (la->la_nlink >= ldata.ld_leh->leh_reccount || + unlikely(la->la_nlink == 0 || + ldata.ld_leh->leh_overflow_time)) GOTO(unlock, rc = 0); la->la_nlink = ldata.ld_leh->leh_reccount; if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) GOTO(unlock, rc = 1); - rc = dt_attr_set(env, child, la, th, BYPASS_CAPA); + rc = dt_attr_set(env, obj, la, th); GOTO(unlock, rc = (rc == 0 ? 1 : rc)); unlock: - dt_write_unlock(env, child); + dt_write_unlock(env, obj); stop: dt_trans_stop(env, dev, th); log: - lfsck_ibits_unlock(&lh, LCK_EX); - if (child != NULL && !IS_ERR(child)) - lfsck_object_put(env, child); - + lfsck_ibits_unlock(&lh, LCK_PW); CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired the object "DFID"'s " "nlink count from %u to %u: rc = %d\n", lfsck_lfsck2name(lfsck), PFID(cfid), old, la->la_nlink, rc); @@ -3063,7 +3083,7 @@ lock: } rc = dt_lookup(env, child, (struct dt_rec *)pfid, - (const struct dt_key *)dotdot, BYPASS_CAPA); + (const struct dt_key *)dotdot); if (rc != 0) { if (rc != -ENOENT && rc != -ENODATA && rc != -EINVAL) { dt_read_unlock(env, child); @@ -3137,13 +3157,13 @@ lock: } GOTO(out, rc); - } + } /* rc != 0 */ linkea_first_entry(&ldata); /* This is the most common case: the object has unique linkEA entry. */ if (ldata.ld_leh->leh_reccount == 1) { rc = lfsck_namespace_dsd_single(env, com, child, pfid, &ldata, - &lh, &type, &retry); + &lh, &type, &retry, &unknown); if (retry) { LASSERT(!lustre_handle_is_used(&lh)); @@ -3175,7 +3195,7 @@ lock: * but the LFSCK cannot aware that at that time, then it adds * the bad linkEA entry for further processing. */ rc = lfsck_namespace_dsd_multiple(env, com, child, pfid, &ldata, - &lh, &type, lpf); + &lh, &type, lpf, &unknown); GOTO(out, rc); @@ -3203,6 +3223,117 @@ out: return rc; } +#define lfsck_time_before(a, b) \ + (typecheck(__u32, a) && \ + typecheck(__u32, b) && \ + ((int)(a) - (int)(b) < 0)) + +static inline bool +lfsck_namespace_linkea_stale_overflow(struct linkea_data *ldata, + struct lfsck_namespace *ns) +{ + /* Both the leh_overflow_time and ln_time_latest_reset are + * local time based, so need NOT to care about clock drift + * among the servers. */ + return ldata->ld_leh->leh_overflow_time && + lfsck_time_before(ldata->ld_leh->leh_overflow_time, + ns->ln_time_latest_reset); +} + +/** + * Clear the object's linkEA overflow timestamp. + * + * If the MDT-object has too many hard links as to the linkEA cannot hold + * all of them, then overflow timestamp will be set in the linkEA header. + * If some hard links are removed after that, then it is possible to hold + * other missed linkEA entries. If the namespace LFSCK have added all the + * related linkEA entries, then it will remove the overflow timestamp. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * \param[in] ldata pointer to the linkEA data for the given @obj + * \param[in] obj pointer to the dt_object to be handled + * + * \retval positive number for repaired cases + * \retval 0 if nothing to be repaired + * \retval negative error number on failure + */ +static int lfsck_namespace_linkea_clear_overflow(const struct lu_env *env, + struct lfsck_component *com, + struct linkea_data *ldata, + struct dt_object *obj) +{ + struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct dt_device *dev = lfsck_obj2dev(obj); + struct thandle *th = NULL; + struct lustre_handle lh = { 0 }; + struct lu_buf linkea_buf; + int rc = 0; + ENTRY; + + LASSERT(!dt_object_remote(obj)); + + rc = lfsck_ibits_lock(env, lfsck, obj, &lh, + MDS_INODELOCK_UPDATE, LCK_PW); + if (rc != 0) + GOTO(log, rc); + + th = dt_trans_create(env, dev); + if (IS_ERR(th)) + GOTO(log, rc = PTR_ERR(th)); + + rc = dt_declare_xattr_set(env, obj, + lfsck_buf_get_const(env, NULL, MAX_LINKEA_SIZE), + XATTR_NAME_LINK, 0, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, dev, th); + if (rc != 0) + GOTO(stop, rc); + + dt_write_lock(env, obj, 0); + rc = lfsck_links_read(env, obj, ldata); + if (rc != 0) + GOTO(unlock, rc); + + if (unlikely(!lfsck_namespace_linkea_stale_overflow(ldata, ns))) + GOTO(unlock, rc = 0); + + ldata->ld_leh->leh_overflow_time = 0; + if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) + GOTO(unlock, rc = 1); + + /* If all known entries are in the linkEA, then the 'leh_reccount' + * should NOT be zero. */ + LASSERT(ldata->ld_leh->leh_reccount > 0); + + lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf, + ldata->ld_leh->leh_len); + rc = dt_xattr_set(env, obj, &linkea_buf, XATTR_NAME_LINK, 0, th); + if (unlikely(rc == -ENOSPC)) + rc = 0; + else if (!rc) + rc = 1; + + GOTO(unlock, rc); + +unlock: + dt_write_unlock(env, obj); + +stop: + dt_trans_stop(env, dev, th); + +log: + lfsck_ibits_unlock(&lh, LCK_PW); + CDEBUG(D_LFSCK, "%s: clear linkea overflow timestamp for the object " + DFID": rc = %d\n", + lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc); + + return rc; +} + /** * Double scan the MDT-object for namespace LFSCK. * @@ -3256,41 +3387,68 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env, rc = lfsck_links_read(env, child, &ldata); dt_read_unlock(env, child); + + if (rc == -EINVAL) { + struct lustre_handle lh = { 0 }; + + rc = lfsck_ibits_lock(env, com->lc_lfsck, child, &lh, + MDS_INODELOCK_UPDATE | + MDS_INODELOCK_XATTR, LCK_EX); + if (rc == 0) { + rc = lfsck_namespace_links_remove(env, com, child); + lfsck_ibits_unlock(&lh, LCK_EX); + } + + GOTO(out, rc); + } + if (rc != 0) GOTO(out, rc); + if (!(ns->ln_flags & LF_INCOMPLETE) && + unlikely(lfsck_namespace_linkea_stale_overflow(&ldata, ns))) { + rc = lfsck_namespace_linkea_clear_overflow(env, com, &ldata, + child); + if (rc < 0) + GOTO(out, rc); + + if (rc > 0) + ns->ln_linkea_overflow_cleared++; + } + linkea_first_entry(&ldata); while (ldata.ld_lee != NULL) { - lfsck_namespace_unpack_linkea_entry(&ldata, cname, pfid, - info->lti_key); - rc = lfsck_namespace_filter_linkea_entry(&ldata, cname, pfid, - false); - /* Found repeated linkEA entries */ - if (rc > 0) { + rc = lfsck_namespace_unpack_linkea_entry(&ldata, cname, pfid, + info->lti_key, + sizeof(info->lti_key)); + /* Invalid PFID in the linkEA entry. */ + if (rc != 0) { rc = lfsck_namespace_shrink_linkea(env, com, child, - &ldata, cname, pfid, false); + &ldata, cname, pfid, true); if (rc < 0) GOTO(out, rc); - if (rc == 0) - continue; - - repaired = true; + if (rc > 0) + repaired = true; - /* fall through */ + continue; } - /* Invalid PFID in the linkEA entry. */ - if (!fid_is_sane(pfid)) { + rc = lfsck_namespace_filter_linkea_entry(&ldata, cname, pfid, + false); + /* Found repeated linkEA entries */ + if (rc > 0) { rc = lfsck_namespace_shrink_linkea(env, com, child, - &ldata, cname, pfid, true); + &ldata, cname, pfid, false); if (rc < 0) GOTO(out, rc); - if (rc > 0) - repaired = true; + if (rc == 0) + continue; - continue; + repaired = true; + + /* fall through */ } parent = lfsck_object_find_bottom(env, lfsck, pfid); @@ -3363,7 +3521,7 @@ lost_parent: repaired = true; continue; - } + } /* !dt_object_exists(parent) */ /* The linkEA entry with bad parent will be removed. */ if (unlikely(!dt_try_as_dir(env, parent))) { @@ -3380,8 +3538,7 @@ lost_parent: } rc = dt_lookup(env, parent, (struct dt_rec *)cfid, - (const struct dt_key *)cname->ln_name, - BYPASS_CAPA); + (const struct dt_key *)cname->ln_name); if (rc != 0 && rc != -ENOENT) { lfsck_object_put(env, parent); @@ -3424,12 +3581,14 @@ lost_parent: continue; } - rc = dt_attr_get(env, child, la, BYPASS_CAPA); + /* The following handles -ENOENT case */ + + rc = dt_attr_get(env, child, la); if (rc != 0) GOTO(out, rc); /* If there is no name entry in the parent dir and the object - * link count is less than the linkea entries count, then the + * link count is fewer than the linkea entries count, then the * linkea entry should be removed. */ if (ldata.ld_leh->leh_reccount > la->la_nlink) { rc = lfsck_namespace_shrink_linkea_cond(env, com, @@ -3519,11 +3678,8 @@ out: if (rc < 0 && rc != -ENODATA) return rc; - if (rc == 0) { - LASSERT(ldata.ld_leh != NULL); - + if (rc == 0 && ldata.ld_leh != NULL) count = ldata.ld_leh->leh_reccount; - } if (count == 0) { /* If the LFSCK is marked as LF_INCOMPLETE, then means some @@ -3533,7 +3689,9 @@ out: * other MDT that references this object with another name, * so we cannot know whether this linkEA is valid or not. * So keep it there and maybe resolved when next LFSCK run. */ - if (!(ns->ln_flags & LF_INCOMPLETE)) { + if (!(ns->ln_flags & LF_INCOMPLETE) && + (ldata.ld_leh == NULL || + !ldata.ld_leh->leh_overflow_time)) { /* If the child becomes orphan, then insert it into * the global .lustre/lost+found/MDTxxxx directory. */ rc = lfsck_namespace_insert_orphan(env, com, child, @@ -3547,15 +3705,29 @@ out: } } } else { - rc = dt_attr_get(env, child, la, BYPASS_CAPA); + rc = dt_attr_get(env, child, la); if (rc != 0) return rc; if (la->la_nlink != 0 && la->la_nlink != count) { - rc = lfsck_namespace_repair_nlink(env, com, child, la); - if (rc > 0) { - ns->ln_objs_nlink_repaired++; - rc = 0; + if (unlikely(!S_ISREG(lfsck_object_type(child)) && + !S_ISLNK(lfsck_object_type(child)))) { + CDEBUG(D_LFSCK, "%s: namespace LFSCK finds " + "the object "DFID"'s nlink count %d " + "does not match linkEA count %d, " + "type %o, skip it.\n", + lfsck_lfsck2name(lfsck), + PFID(lfsck_dto2fid(child)), + la->la_nlink, count, + lfsck_object_type(child)); + } else if (la->la_nlink < count && + likely(!ldata.ld_leh->leh_overflow_time)) { + rc = lfsck_namespace_repair_nlink(env, com, + child, la); + if (rc > 0) { + ns->ln_objs_nlink_repaired++; + rc = 0; + } } } } @@ -3578,76 +3750,78 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m, __u32 time_phase1, __u32 time_phase2) { - seq_printf(m, "checked_phase1: "LPU64"\n" - "checked_phase2: "LPU64"\n" - "updated_phase1: "LPU64"\n" - "updated_phase2: "LPU64"\n" - "failed_phase1: "LPU64"\n" - "failed_phase2: "LPU64"\n" - "directories: "LPU64"\n" - "dirent_repaired: "LPU64"\n" - "linkea_repaired: "LPU64"\n" - "nlinks_repaired: "LPU64"\n" - "multiple_linked_checked: "LPU64"\n" - "multiple_linked_repaired: "LPU64"\n" - "unknown_inconsistency: "LPU64"\n" - "unmatched_pairs_repaired: "LPU64"\n" - "dangling_repaired: "LPU64"\n" - "multiple_referenced_repaired: "LPU64"\n" - "bad_file_type_repaired: "LPU64"\n" - "lost_dirent_repaired: "LPU64"\n" - "local_lost_found_scanned: "LPU64"\n" - "local_lost_found_moved: "LPU64"\n" - "local_lost_found_skipped: "LPU64"\n" - "local_lost_found_failed: "LPU64"\n" - "striped_dirs_scanned: "LPU64"\n" - "striped_dirs_repaired: "LPU64"\n" - "striped_dirs_failed: "LPU64"\n" - "striped_dirs_disabled: "LPU64"\n" - "striped_dirs_skipped: "LPU64"\n" - "striped_shards_scanned: "LPU64"\n" - "striped_shards_repaired: "LPU64"\n" - "striped_shards_failed: "LPU64"\n" - "striped_shards_skipped: "LPU64"\n" - "name_hash_repaired: "LPU64"\n" - "success_count: %u\n" - "run_time_phase1: %u seconds\n" - "run_time_phase2: %u seconds\n", - checked_phase1, - checked_phase2, - ns->ln_items_repaired, - ns->ln_objs_repaired_phase2, - ns->ln_items_failed, - ns->ln_objs_failed_phase2, - ns->ln_dirs_checked, - ns->ln_dirent_repaired, - ns->ln_linkea_repaired, - ns->ln_objs_nlink_repaired, - ns->ln_mul_linked_checked, - ns->ln_mul_linked_repaired, - ns->ln_unknown_inconsistency, - ns->ln_unmatched_pairs_repaired, - ns->ln_dangling_repaired, - ns->ln_mul_ref_repaired, - ns->ln_bad_type_repaired, - ns->ln_lost_dirent_repaired, - ns->ln_local_lpf_scanned, - ns->ln_local_lpf_moved, - ns->ln_local_lpf_skipped, - ns->ln_local_lpf_failed, - ns->ln_striped_dirs_scanned, - ns->ln_striped_dirs_repaired, - ns->ln_striped_dirs_failed, - ns->ln_striped_dirs_disabled, - ns->ln_striped_dirs_skipped, - ns->ln_striped_shards_scanned, - ns->ln_striped_shards_repaired, - ns->ln_striped_shards_failed, - ns->ln_striped_shards_skipped, - ns->ln_name_hash_repaired, - ns->ln_success_count, - time_phase1, - time_phase2); + seq_printf(m, "checked_phase1: %llu\n" + "checked_phase2: %llu\n" + "updated_phase1: %llu\n" + "updated_phase2: %llu\n" + "failed_phase1: %llu\n" + "failed_phase2: %llu\n" + "directories: %llu\n" + "dirent_repaired: %llu\n" + "linkea_repaired: %llu\n" + "nlinks_repaired: %llu\n" + "multiple_linked_checked: %llu\n" + "multiple_linked_repaired: %llu\n" + "unknown_inconsistency: %llu\n" + "unmatched_pairs_repaired: %llu\n" + "dangling_repaired: %llu\n" + "multiple_referenced_repaired: %llu\n" + "bad_file_type_repaired: %llu\n" + "lost_dirent_repaired: %llu\n" + "local_lost_found_scanned: %llu\n" + "local_lost_found_moved: %llu\n" + "local_lost_found_skipped: %llu\n" + "local_lost_found_failed: %llu\n" + "striped_dirs_scanned: %llu\n" + "striped_dirs_repaired: %llu\n" + "striped_dirs_failed: %llu\n" + "striped_dirs_disabled: %llu\n" + "striped_dirs_skipped: %llu\n" + "striped_shards_scanned: %llu\n" + "striped_shards_repaired: %llu\n" + "striped_shards_failed: %llu\n" + "striped_shards_skipped: %llu\n" + "name_hash_repaired: %llu\n" + "linkea_overflow_cleared: %llu\n" + "success_count: %u\n" + "run_time_phase1: %u seconds\n" + "run_time_phase2: %u seconds\n", + checked_phase1, + checked_phase2, + ns->ln_items_repaired, + ns->ln_objs_repaired_phase2, + ns->ln_items_failed, + ns->ln_objs_failed_phase2, + ns->ln_dirs_checked, + ns->ln_dirent_repaired, + ns->ln_linkea_repaired, + ns->ln_objs_nlink_repaired, + ns->ln_mul_linked_checked, + ns->ln_mul_linked_repaired, + ns->ln_unknown_inconsistency, + ns->ln_unmatched_pairs_repaired, + ns->ln_dangling_repaired, + ns->ln_mul_ref_repaired, + ns->ln_bad_type_repaired, + ns->ln_lost_dirent_repaired, + ns->ln_local_lpf_scanned, + ns->ln_local_lpf_moved, + ns->ln_local_lpf_skipped, + ns->ln_local_lpf_failed, + ns->ln_striped_dirs_scanned, + ns->ln_striped_dirs_repaired, + ns->ln_striped_dirs_failed, + ns->ln_striped_dirs_disabled, + ns->ln_striped_dirs_skipped, + ns->ln_striped_shards_scanned, + ns->ln_striped_shards_repaired, + ns->ln_striped_shards_failed, + ns->ln_striped_shards_skipped, + ns->ln_name_hash_repaired, + ns->ln_linkea_overflow_cleared, + ns->ln_success_count, + time_phase1, + time_phase2); } static void lfsck_namespace_release_lmv(const struct lu_env *env, @@ -3680,7 +3854,7 @@ static int lfsck_namespace_check_for_double_scan(const struct lu_env *env, struct lu_attr *la = &lfsck_env_info(env)->lti_la; int rc; - rc = dt_attr_get(env, obj, la, BYPASS_CAPA); + rc = dt_attr_get(env, obj, la); if (rc != 0) return rc; @@ -3709,7 +3883,6 @@ static int lfsck_namespace_reset(const struct lu_env *env, struct lfsck_namespace *ns = com->lc_file_ram; struct lfsck_assistant_data *lad = com->lc_data; struct dt_object *root; - struct dt_object *dto; int rc; ENTRY; @@ -3733,23 +3906,23 @@ static int lfsck_namespace_reset(const struct lu_env *env, } ns->ln_magic = LFSCK_NAMESPACE_MAGIC; ns->ln_status = LS_INIT; + ns->ln_time_latest_reset = cfs_time_current_sec(); - lfsck_object_put(env, com->lc_obj); - com->lc_obj = NULL; - dto = lfsck_namespace_load_one_trace_file(env, com, root, - LFSCK_NAMESPACE, NULL, true); - if (IS_ERR(dto)) - GOTO(out, rc = PTR_ERR(dto)); + rc = lfsck_load_one_trace_file(env, com, root, &com->lc_obj, + &dt_lfsck_namespace_features, + LFSCK_NAMESPACE, true); + if (rc) + GOTO(out, rc); - com->lc_obj = dto; - rc = lfsck_namespace_load_sub_trace_files(env, com, true); + rc = lfsck_load_sub_trace_files(env, com, &dt_lfsck_namespace_features, + LFSCK_NAMESPACE, true); if (rc != 0) GOTO(out, rc); lad->lad_incomplete = 0; CFS_RESET_BITMAP(lad->lad_bitmap); - rc = lfsck_namespace_store(env, com, true); + rc = lfsck_namespace_store(env, com); GOTO(out, rc); @@ -3757,7 +3930,7 @@ out: up_write(&com->lc_sem); put: - lu_object_put(env, &root->do_lu); + lfsck_object_put(env, root); log: CDEBUG(D_LFSCK, "%s: namespace LFSCK reset: rc = %d\n", lfsck_lfsck2name(lfsck), rc); @@ -3782,6 +3955,7 @@ static void lfsck_namespace_close_dir(const struct lu_env *env, { struct lfsck_namespace *ns = com->lc_file_ram; struct lfsck_assistant_data *lad = com->lc_data; + struct lfsck_assistant_object *lso = NULL; struct lfsck_instance *lfsck = com->lc_lfsck; struct lfsck_lmv *llmv = lfsck->li_lmv; struct lfsck_namespace_req *lnr; @@ -3800,13 +3974,21 @@ static void lfsck_namespace_close_dir(const struct lu_env *env, RETURN_EXIT; } + lso = lfsck_assistant_object_init(env, lfsck_dto2fid(lfsck->li_obj_dir), + NULL, lfsck->li_pos_current.lp_oit_cookie, true); + if (IS_ERR(lso)) { + OBD_FREE(lnr, size); + ns->ln_striped_dirs_skipped++; + + RETURN_EXIT; + } + /* Generate a dummy request to indicate that all shards' name entry * in this striped directory has been scanned for the first time. */ INIT_LIST_HEAD(&lnr->lnr_lar.lar_list); - lnr->lnr_lar.lar_fid = *lfsck_dto2fid(lfsck->li_obj_dir); + lnr->lnr_lar.lar_parent = lso; lnr->lnr_lmv = lfsck_lmv_get(llmv); lnr->lnr_fid = *lfsck_dto2fid(lfsck->li_obj_dir); - lnr->lnr_oit_cookie = lfsck->li_pos_current.lp_oit_cookie; lnr->lnr_dir_cookie = MDS_DIR_END_OFF; lnr->lnr_size = size; @@ -3846,10 +4028,9 @@ static int lfsck_namespace_open_dir(const struct lu_env *env, if (llmv->ll_lmv_master) { struct lmv_mds_md_v1 *lmv = &llmv->ll_lmv; - if (lmv->lmv_master_mdt_index != - lfsck_dev_idx(lfsck->li_bottom)) { + if (lmv->lmv_master_mdt_index != lfsck_dev_idx(lfsck)) { lmv->lmv_master_mdt_index = - lfsck_dev_idx(lfsck->li_bottom); + lfsck_dev_idx(lfsck); ns->ln_flags |= LF_INCONSISTENT; llmv->ll_lmv_updated = 1; } @@ -3886,15 +4067,15 @@ static int lfsck_namespace_checkpoint(const struct lu_env *env, com->lc_new_checked = 0; } - rc = lfsck_namespace_store(env, com, false); + rc = lfsck_namespace_store(env, com); up_write(&com->lc_sem); log: - CDEBUG(D_LFSCK, "%s: namespace LFSCK checkpoint at the pos ["LPU64 - ", "DFID", "LPX64"]: rc = %d\n", lfsck_lfsck2name(lfsck), - lfsck->li_pos_current.lp_oit_cookie, + CDEBUG(D_LFSCK, "%s: namespace LFSCK checkpoint at the pos [%llu" + ", "DFID", %#llx], status = %d: rc = %d\n", + lfsck_lfsck2name(lfsck), lfsck->li_pos_current.lp_oit_cookie, PFID(&lfsck->li_pos_current.lp_dir_parent), - lfsck->li_pos_current.lp_dir_cookie, rc); + lfsck->li_pos_current.lp_dir_cookie, ns->ln_status, rc); return rc > 0 ? 0 : rc; } @@ -3992,8 +4173,8 @@ static int lfsck_namespace_prep(const struct lu_env *env, rc = lfsck_start_assistant(env, com, lsp); - CDEBUG(D_LFSCK, "%s: namespace LFSCK prep done, start pos ["LPU64", " - DFID", "LPX64"]: rc = %d\n", + CDEBUG(D_LFSCK, "%s: namespace LFSCK prep done, start pos [%llu, " + DFID", %#llx]: rc = %d\n", lfsck_lfsck2name(lfsck), pos->lp_oit_cookie, PFID(&pos->lp_dir_parent), pos->lp_dir_cookie, rc); @@ -4011,11 +4192,9 @@ static int lfsck_namespace_exec_oit(const struct lu_env *env, struct lu_fid *pfid = &info->lti_fid2; struct lu_name *cname = &info->lti_name; struct lu_seq_range *range = &info->lti_range; - struct dt_device *dev = lfsck->li_bottom; - struct seq_server_site *ss = - lu_site2seq(dev->dd_lu_dev.ld_site); + struct seq_server_site *ss = lfsck_dev_site(lfsck); struct linkea_data ldata = { NULL }; - __u32 idx = lfsck_dev_idx(dev); + __u32 idx = lfsck_dev_idx(lfsck); int rc; ENTRY; @@ -4043,7 +4222,7 @@ static int lfsck_namespace_exec_oit(const struct lu_env *env, GOTO(out, rc = (rc == -ENOENT ? 0 : rc)); } - if (rc == -ENODATA) { + if (rc == -ENODATA || unlikely(!ldata.ld_leh->leh_reccount)) { rc = lfsck_namespace_check_for_double_scan(env, com, obj); GOTO(out, rc); @@ -4082,7 +4261,6 @@ static int lfsck_namespace_exec_oit(const struct lu_env *env, out: down_write(&com->lc_sem); - com->lc_new_checked++; if (S_ISDIR(lfsck_object_type(obj))) ns->ln_dirs_checked++; if (rc != 0) @@ -4094,6 +4272,7 @@ out: static int lfsck_namespace_exec_dir(const struct lu_env *env, struct lfsck_component *com, + struct lfsck_assistant_object *lso, struct lu_dirent *ent, __u16 type) { struct lfsck_assistant_data *lad = com->lc_data; @@ -4106,7 +4285,6 @@ static int lfsck_namespace_exec_dir(const struct lu_env *env, bool wakeup = false; l_wait_event(mthread->t_ctl_waitq, - bk->lb_async_windows == 0 || lad->lad_prefetched < bk->lb_async_windows || !thread_is_running(mthread) || thread_is_stopped(athread), @@ -4119,7 +4297,7 @@ static int lfsck_namespace_exec_dir(const struct lu_env *env, if (unlikely(lfsck_is_dead_obj(lfsck->li_obj_dir))) return 0; - lnr = lfsck_namespace_assistant_req_init(com->lc_lfsck, ent, type); + lnr = lfsck_namespace_assistant_req_init(com->lc_lfsck, lso, ent, type); if (IS_ERR(lnr)) { struct lfsck_namespace *ns = com->lc_file_ram; @@ -4197,7 +4375,7 @@ static int lfsck_namespace_post(const struct lu_env *env, com->lc_new_checked = 0; } - rc = lfsck_namespace_store(env, com, false); + rc = lfsck_namespace_store(env, com); up_write(&com->lc_sem); CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n", @@ -4206,14 +4384,13 @@ static int lfsck_namespace_post(const struct lu_env *env, RETURN(rc); } -static int +static void lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com, struct seq_file *m) { struct lfsck_instance *lfsck = com->lc_lfsck; struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; struct lfsck_namespace *ns = com->lc_file_ram; - int rc; down_read(&com->lc_sem); seq_printf(m, "name: lfsck_namespace\n" @@ -4222,45 +4399,25 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com, "status: %s\n", ns->ln_magic, bk->lb_version, - lfsck_status2names(ns->ln_status)); + lfsck_status2name(ns->ln_status)); - rc = lfsck_bits_dump(m, ns->ln_flags, lfsck_flags_names, "flags"); - if (rc < 0) - goto out; + lfsck_bits_dump(m, ns->ln_flags, lfsck_flags_names, "flags"); - rc = lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param"); - if (rc < 0) - goto out; + lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param"); - rc = lfsck_time_dump(m, ns->ln_time_last_complete, - "time_since_last_completed"); - if (rc < 0) - goto out; + lfsck_time_dump(m, ns->ln_time_last_complete, "last_completed"); - rc = lfsck_time_dump(m, ns->ln_time_latest_start, - "time_since_latest_start"); - if (rc < 0) - goto out; + lfsck_time_dump(m, ns->ln_time_latest_start, "latest_start"); - rc = lfsck_time_dump(m, ns->ln_time_last_checkpoint, - "time_since_last_checkpoint"); - if (rc < 0) - goto out; + lfsck_time_dump(m, ns->ln_time_last_checkpoint, "last_checkpoint"); - rc = lfsck_pos_dump(m, &ns->ln_pos_latest_start, - "latest_start_position"); - if (rc < 0) - goto out; + lfsck_pos_dump(m, &ns->ln_pos_latest_start, "latest_start_position"); - rc = lfsck_pos_dump(m, &ns->ln_pos_last_checkpoint, - "last_checkpoint_position"); - if (rc < 0) - goto out; + lfsck_pos_dump(m, &ns->ln_pos_last_checkpoint, + "last_checkpoint_position"); - rc = lfsck_pos_dump(m, &ns->ln_pos_first_inconsistent, - "first_failure_position"); - if (rc < 0) - goto out; + lfsck_pos_dump(m, &ns->ln_pos_first_inconsistent, + "first_failure_position"); if (ns->ln_status == LS_SCANNING_PHASE1) { struct lfsck_position pos; @@ -4276,18 +4433,19 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com, if (duration != 0) do_div(new_checked, duration); + if (rtime != 0) do_div(speed, rtime); - lfsck_namespace_dump_statistics(m, ns, checked, - ns->ln_objs_checked_phase2, - rtime, ns->ln_run_time_phase2); - seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n" - "average_speed_phase2: N/A\n" - "real_time_speed_phase1: "LPU64" items/sec\n" - "real_time_speed_phase2: N/A\n", - speed, - new_checked); + lfsck_namespace_dump_statistics(m, ns, checked, 0, rtime, 0); + seq_printf(m, "average_speed_phase1: %llu items/sec\n" + "average_speed_phase2: N/A\n" + "average_speed_total: %llu items/sec\n" + "real_time_speed_phase1: %llu items/sec\n" + "real_time_speed_phase2: N/A\n", + speed, + speed, + new_checked); LASSERT(lfsck->li_di_oit != NULL); @@ -4319,59 +4477,83 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com, lfsck_pos_dump(m, &pos, "current_position"); } else if (ns->ln_status == LS_SCANNING_PHASE2) { cfs_duration_t duration = cfs_time_current() - - lfsck->li_time_last_checkpoint; + com->lc_time_last_checkpoint; __u64 checked = ns->ln_objs_checked_phase2 + com->lc_new_checked; __u64 speed1 = ns->ln_items_checked; __u64 speed2 = checked; + __u64 speed0 = speed1 + speed2; __u64 new_checked = com->lc_new_checked * msecs_to_jiffies(MSEC_PER_SEC); __u32 rtime = ns->ln_run_time_phase2 + cfs_duration_sec(duration + HALF_SEC); + __u32 time0 = ns->ln_run_time_phase1 + rtime; if (duration != 0) do_div(new_checked, duration); + if (ns->ln_run_time_phase1 != 0) do_div(speed1, ns->ln_run_time_phase1); + else if (ns->ln_items_checked != 0) + time0++; + if (rtime != 0) do_div(speed2, rtime); + else if (checked != 0) + time0++; + + if (time0 != 0) + do_div(speed0, time0); + lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked, checked, ns->ln_run_time_phase1, rtime); - - seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n" - "average_speed_phase2: "LPU64" objs/sec\n" - "real_time_speed_phase1: N/A\n" - "real_time_speed_phase2: "LPU64" objs/sec\n" - "current_position: "DFID"\n", - speed1, - speed2, - new_checked, - PFID(&ns->ln_fid_latest_scanned_phase2)); + seq_printf(m, "average_speed_phase1: %llu items/sec\n" + "average_speed_phase2: %llu objs/sec\n" + "average_speed_total: %llu items/sec\n" + "real_time_speed_phase1: N/A\n" + "real_time_speed_phase2: %llu objs/sec\n" + "current_position: "DFID"\n", + speed1, + speed2, + speed0, + new_checked, + PFID(&ns->ln_fid_latest_scanned_phase2)); } else { __u64 speed1 = ns->ln_items_checked; __u64 speed2 = ns->ln_objs_checked_phase2; + __u64 speed0 = speed1 + speed2; + __u32 time0 = ns->ln_run_time_phase1 + ns->ln_run_time_phase2; if (ns->ln_run_time_phase1 != 0) do_div(speed1, ns->ln_run_time_phase1); + else if (ns->ln_items_checked != 0) + time0++; + if (ns->ln_run_time_phase2 != 0) do_div(speed2, ns->ln_run_time_phase2); + else if (ns->ln_objs_checked_phase2 != 0) + time0++; + + if (time0 != 0) + do_div(speed0, time0); + lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked, ns->ln_objs_checked_phase2, ns->ln_run_time_phase1, ns->ln_run_time_phase2); - - seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n" - "average_speed_phase2: "LPU64" objs/sec\n" - "real_time_speed_phase1: N/A\n" - "real_time_speed_phase2: N/A\n" - "current_position: N/A\n", - speed1, - speed2); + seq_printf(m, "average_speed_phase1: %llu items/sec\n" + "average_speed_phase2: %llu objs/sec\n" + "average_speed_total: %llu items/sec\n" + "real_time_speed_phase1: N/A\n" + "real_time_speed_phase2: N/A\n" + "current_position: N/A\n", + speed1, + speed2, + speed0); } -out: + up_read(&com->lc_sem); - return 0; } static int lfsck_namespace_double_scan(const struct lu_env *env, @@ -4477,99 +4659,22 @@ static int lfsck_namespace_in_notify(const struct lu_env *env, struct lfsck_assistant_data *lad = com->lc_data; struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs; struct lfsck_tgt_desc *ltd; - int rc; + int rc = 0; bool fail = false; ENTRY; switch (lr->lr_event) { - case LE_SKIP_NLINK_DECLARE: { - struct dt_object *obj; - struct lu_fid *key = &lfsck_env_info(env)->lti_fid3; - int idx; - __u8 flags = 0; - - LASSERT(th != NULL); - - idx = lfsck_sub_trace_file_fid2idx(&lr->lr_fid); - obj = com->lc_sub_trace_objs[idx].lsto_obj; - fid_cpu_to_be(key, &lr->lr_fid); - mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex); - rc = dt_declare_delete(env, obj, - (const struct dt_key *)key, th); - if (rc == 0) - rc = dt_declare_insert(env, obj, - (const struct dt_rec *)&flags, - (const struct dt_key *)key, th); - mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex); - - RETURN(rc); - } - case LE_SKIP_NLINK: { - struct dt_object *obj; - struct lu_fid *key = &lfsck_env_info(env)->lti_fid3; - int idx; - __u8 flags = 0; - bool exist = false; - ENTRY; - - LASSERT(th != NULL); - - idx = lfsck_sub_trace_file_fid2idx(&lr->lr_fid); - obj = com->lc_sub_trace_objs[idx].lsto_obj; - fid_cpu_to_be(key, &lr->lr_fid); - mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex); - rc = dt_lookup(env, obj, (struct dt_rec *)&flags, - (const struct dt_key *)key, BYPASS_CAPA); - if (rc == 0) { - if (flags & LNTF_SKIP_NLINK) { - mutex_unlock( - &com->lc_sub_trace_objs[idx].lsto_mutex); - - RETURN(0); - } - - exist = true; - } else if (rc != -ENOENT) { - GOTO(log, rc); - } - - flags |= LNTF_SKIP_NLINK; - if (exist) { - rc = dt_delete(env, obj, (const struct dt_key *)key, - th, BYPASS_CAPA); - if (rc != 0) - GOTO(log, rc); - } - - rc = dt_insert(env, obj, (const struct dt_rec *)&flags, - (const struct dt_key *)key, th, BYPASS_CAPA, 1); - - GOTO(log, rc); - -log: - mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex); - CDEBUG(D_LFSCK, "%s: RPC service thread mark the "DFID - " to be skipped for namespace double scan: rc = %d\n", - lfsck_lfsck2name(com->lc_lfsck), PFID(&lr->lr_fid), rc); - - if (rc != 0) - /* If we cannot record this object in the LFSCK tracing, - * we have to mark the LFSC as LF_INCOMPLETE, then the - * LFSCK will skip nlink attribute verification for - * all objects. */ - ns->ln_flags |= LF_INCOMPLETE; - - return 0; - } case LE_SET_LMV_MASTER: { struct dt_object *obj; - obj = lfsck_object_find_by_dev(env, lfsck->li_bottom, - &lr->lr_fid); + obj = lfsck_object_find_bottom(env, lfsck, &lr->lr_fid); if (IS_ERR(obj)) RETURN(PTR_ERR(obj)); - rc = lfsck_namespace_notify_lmv_master_local(env, com, obj); + if (likely(dt_object_exists(obj))) + rc = lfsck_namespace_notify_lmv_master_local(env, com, + obj); + lfsck_object_put(env, obj); RETURN(rc > 0 ? 0 : rc); @@ -4596,7 +4701,7 @@ log: lr->lr_index, lr->lr_status, lr->lr_flags2); spin_lock(<ds->ltd_lock); - ltd = LTD_TGT(ltds, lr->lr_index); + ltd = lfsck_ltd2tgt(ltds, lr->lr_index); if (ltd == NULL) { spin_unlock(<ds->ltd_lock); @@ -4661,12 +4766,81 @@ log: RETURN(0); } +static void lfsck_namespace_repaired(struct lfsck_namespace *ns, __u64 *count) +{ + *count += ns->ln_objs_nlink_repaired; + *count += ns->ln_dirent_repaired; + *count += ns->ln_linkea_repaired; + *count += ns->ln_mul_linked_repaired; + *count += ns->ln_unmatched_pairs_repaired; + *count += ns->ln_dangling_repaired; + *count += ns->ln_mul_ref_repaired; + *count += ns->ln_bad_type_repaired; + *count += ns->ln_lost_dirent_repaired; + *count += ns->ln_striped_dirs_disabled; + *count += ns->ln_striped_dirs_repaired; + *count += ns->ln_striped_shards_repaired; + *count += ns->ln_name_hash_repaired; + *count += ns->ln_local_lpf_moved; +} + +static int lfsck_namespace_query_all(const struct lu_env *env, + struct lfsck_component *com, + __u32 *mdts_count, __u64 *repaired) +{ + struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_tgt_descs *ltds = &com->lc_lfsck->li_mdt_descs; + struct lfsck_tgt_desc *ltd; + int idx; + int rc; + ENTRY; + + rc = lfsck_query_all(env, com); + if (rc != 0) + RETURN(rc); + + down_read(<ds->ltd_rw_sem); + cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) { + ltd = lfsck_ltd2tgt(ltds, idx); + LASSERT(ltd != NULL); + + mdts_count[ltd->ltd_namespace_status]++; + *repaired += ltd->ltd_namespace_repaired; + } + up_read(<ds->ltd_rw_sem); + + down_read(&com->lc_sem); + mdts_count[ns->ln_status]++; + lfsck_namespace_repaired(ns, repaired); + up_read(&com->lc_sem); + + RETURN(0); +} + static int lfsck_namespace_query(const struct lu_env *env, - struct lfsck_component *com) + struct lfsck_component *com, + struct lfsck_request *req, + struct lfsck_reply *rep, + struct lfsck_query *que, int idx) { struct lfsck_namespace *ns = com->lc_file_ram; + int rc = 0; + + if (que != NULL) { + LASSERT(com->lc_lfsck->li_master); + + rc = lfsck_namespace_query_all(env, com, + que->lu_mdts_count[idx], + &que->lu_repaired[idx]); + } else { + down_read(&com->lc_sem); + rep->lr_status = ns->ln_status; + if (req->lr_flags & LEF_QUERY_ALL) + lfsck_namespace_repaired(ns, &rep->lr_repaired); + up_read(&com->lc_sem); + } - return ns->ln_status; + return rc; } static struct lfsck_operations lfsck_namespace_ops = { @@ -4726,13 +4900,15 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, struct dt_insert_rec *rec = &info->lti_dt_rec; struct lmv_mds_md_v1 *lmv2 = &info->lti_lmv2; const struct lu_name *cname; + const struct lu_fid *pfid = lfsck_dto2fid(parent); + const struct lu_fid *cfid = lfsck_dto2fid(child); struct linkea_data ldata = { NULL }; - struct lustre_handle lh = { 0 }; + struct lfsck_lock_handle *llh = &info->lti_llh; struct lu_buf linkea_buf; struct lu_buf lmv_buf; struct lfsck_instance *lfsck = com->lc_lfsck; struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; - struct dt_device *dev = lfsck_obj2dt_dev(child); + struct dt_device *dev = lfsck->li_next; struct thandle *th = NULL; int rc = 0; __u16 type = lnr->lnr_type; @@ -4748,16 +4924,27 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, if (!create || bk->lb_param & LPF_DRYRUN) GOTO(log, rc = 0); - rc = linkea_data_new(&ldata, &info->lti_linkea_buf2); - if (rc != 0) - GOTO(log, rc); + /* We may need to create the sub-objects of the @child via LOD, + * so make the modification based on lfsck->li_next. */ + + parent = lfsck_object_locate(dev, parent); + if (IS_ERR(parent)) + GOTO(log, rc = PTR_ERR(parent)); + + if (unlikely(!dt_try_as_dir(env, parent))) + GOTO(log, rc = -ENOTDIR); + + child = lfsck_object_locate(dev, child); + if (IS_ERR(child)) + GOTO(log, rc = PTR_ERR(child)); - rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent)); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf2, + cname, pfid); if (rc != 0) GOTO(log, rc); - rc = lfsck_ibits_lock(env, lfsck, parent, &lh, - MDS_INODELOCK_UPDATE, LCK_EX); + rc = lfsck_lock(env, lfsck, parent, lnr->lnr_name, llh, + MDS_INODELOCK_UPDATE, LCK_PR); if (rc != 0) GOTO(log, rc); @@ -4765,10 +4952,6 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, if (rc != 0) GOTO(log, rc); - th = dt_trans_create(env, dev); - if (IS_ERR(th)) - GOTO(log, rc = PTR_ERR(th)); - /* Set the ctime as zero, then others can know it is created for * repairing dangling name entry by LFSCK. And if the LFSCK made * wrong decision and the real MDT-object has been found later, @@ -4779,7 +4962,7 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, LA_ATIME | LA_MTIME | LA_CTIME; child->do_ops->do_ah_init(env, hint, parent, child, - la->la_mode & S_IFMT); + la->la_mode & S_IFMT); memset(dof, 0, sizeof(*dof)); dof->dof_type = dt_mode_to_dft(type); @@ -4787,6 +4970,10 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, * the MDT-object without stripes (dof->dof_reg.striped = 0). related * OST-objects will be created when write open. */ + th = dt_trans_create(env, dev); + if (IS_ERR(th)) + GOTO(log, rc = PTR_ERR(th)); + /* 1a. create child. */ rc = dt_declare_create(env, child, la, hint, dof, th); if (rc != 0) @@ -4796,35 +4983,35 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, if (unlikely(!dt_try_as_dir(env, child))) GOTO(stop, rc = -ENOTDIR); - /* 2a. insert dot into child dir */ + /* 2a. increase child nlink */ + rc = dt_declare_ref_add(env, child, th); + if (rc != 0) + GOTO(stop, rc); + + /* 3a. insert dot into child dir */ rec->rec_type = S_IFDIR; - rec->rec_fid = lfsck_dto2fid(child); + rec->rec_fid = cfid; rc = dt_declare_insert(env, child, (const struct dt_rec *)rec, (const struct dt_key *)dot, th); if (rc != 0) GOTO(stop, rc); - /* 3a. insert dotdot into child dir */ - rec->rec_fid = lfsck_dto2fid(parent); + /* 4a. insert dotdot into child dir */ + rec->rec_fid = pfid; rc = dt_declare_insert(env, child, (const struct dt_rec *)rec, (const struct dt_key *)dotdot, th); if (rc != 0) GOTO(stop, rc); - /* 4a. increase child nlink */ - rc = dt_declare_ref_add(env, child, th); - if (rc != 0) - GOTO(stop, rc); - /* 5a. generate slave LMV EA. */ if (lnr->lnr_lmv != NULL && lnr->lnr_lmv->ll_lmv_master) { int idx; idx = lfsck_shard_name_to_index(env, lnr->lnr_name, lnr->lnr_namelen, - type, lfsck_dto2fid(child)); + type, cfid); if (unlikely(idx < 0)) GOTO(stop, rc = idx); @@ -4849,7 +5036,7 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, if (rc != 0) GOTO(stop, rc); - rc = dt_trans_start(env, dev, th); + rc = dt_trans_start_local(env, dev, th); if (rc != 0) GOTO(stop, rc = (rc == -EEXIST ? 1 : rc)); @@ -4860,34 +5047,30 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, GOTO(unlock, rc = (rc == -EEXIST ? 1 : rc)); if (S_ISDIR(type)) { - if (unlikely(!dt_try_as_dir(env, child))) - GOTO(unlock, rc = -ENOTDIR); - - /* 2b. insert dot into child dir */ - rec->rec_type = S_IFDIR; - rec->rec_fid = lfsck_dto2fid(child); - rc = dt_insert(env, child, (const struct dt_rec *)rec, - (const struct dt_key *)dot, th, BYPASS_CAPA, 1); + /* 2b. increase child nlink */ + rc = dt_ref_add(env, child, th); if (rc != 0) GOTO(unlock, rc); - /* 3b. insert dotdot into child dir */ - rec->rec_fid = lfsck_dto2fid(parent); + /* 3b. insert dot into child dir */ + rec->rec_type = S_IFDIR; + rec->rec_fid = cfid; rc = dt_insert(env, child, (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th, - BYPASS_CAPA, 1); + (const struct dt_key *)dot, th, 1); if (rc != 0) GOTO(unlock, rc); - /* 4b. increase child nlink */ - rc = dt_ref_add(env, child, th); + /* 4b. insert dotdot into child dir */ + rec->rec_fid = pfid; + rc = dt_insert(env, child, (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, th, 1); if (rc != 0) GOTO(unlock, rc); /* 5b. generate slave LMV EA. */ if (lnr->lnr_lmv != NULL && lnr->lnr_lmv->ll_lmv_master) { rc = dt_xattr_set(env, child, &lmv_buf, XATTR_NAME_LMV, - 0, th, BYPASS_CAPA); + 0, th); if (rc != 0) GOTO(unlock, rc); } @@ -4895,7 +5078,7 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, /* 6b. insert linkEA for child. */ rc = dt_xattr_set(env, child, &linkea_buf, - XATTR_NAME_LINK, 0, th, BYPASS_CAPA); + XATTR_NAME_LINK, 0, th); GOTO(unlock, rc); @@ -4906,13 +5089,12 @@ stop: dt_trans_stop(env, dev, th); log: - lfsck_ibits_unlock(&lh, LCK_EX); + lfsck_unlock(llh); CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found dangling " "reference for: parent "DFID", child "DFID", type %u, " "name %s. %s: rc = %d\n", lfsck_lfsck2name(lfsck), - PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)), - type, cname->ln_name, - create ? "Create the lost OST-object as required" : + PFID(pfid), PFID(cfid), type, cname->ln_name, + create ? "Create the lost MDT-object as required" : "Keep the MDT-object there by default", rc); if (rc <= 0) { @@ -4933,6 +5115,7 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env, struct lfsck_instance *lfsck = com->lc_lfsck; struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_assistant_data *lad = com->lc_data; struct linkea_data ldata = { NULL }; const struct lu_name *cname; struct thandle *handle = NULL; @@ -4940,7 +5123,8 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env, container_of0(lar, struct lfsck_namespace_req, lnr_lar); struct dt_object *dir = NULL; struct dt_object *obj = NULL; - const struct lu_fid *pfid; + struct lfsck_assistant_object *lso = lar->lar_parent; + const struct lu_fid *pfid = &lso->lso_fid; struct dt_device *dev = NULL; struct lustre_handle lh = { 0 }; bool repaired = false; @@ -4949,20 +5133,16 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env, bool newdata; bool log = false; bool bad_hash = false; + bool bad_linkea = false; int idx = 0; int count = 0; int rc = 0; enum lfsck_namespace_inconsistency_type type = LNIT_NONE; ENTRY; - dir = lfsck_object_find_bottom(env, lfsck, &lar->lar_fid); - if (IS_ERR(dir)) - RETURN(PTR_ERR(dir)); - - if (unlikely(lfsck_is_dead_obj(dir))) - GOTO(put_dir, rc = 0); + if (lso->lso_dead) + RETURN(0); - pfid = lfsck_dto2fid(dir); la->la_nlink = 0; if (lnr->lnr_attr & LUDA_UPGRADE) { ns->ln_flags |= LF_UPGRADE; @@ -5003,26 +5183,25 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env, } if (unlikely(lnr->lnr_dir_cookie == MDS_DIR_END_OFF)) { - rc = lfsck_namespace_striped_dir_rescan(env, com, dir, lnr); + rc = lfsck_namespace_striped_dir_rescan(env, com, lnr); - GOTO(put_dir, rc); + RETURN(rc); } - if (lnr->lnr_name[0] == '.' && - (lnr->lnr_namelen == 1 || fid_seq_is_dot(fid_seq(&lnr->lnr_fid)))) + if (fid_seq_is_dot(fid_seq(&lnr->lnr_fid))) GOTO(out, rc = 0); if (lnr->lnr_lmv != NULL && lnr->lnr_lmv->ll_lmv_master) { - rc = lfsck_namespace_handle_striped_master(env, com, dir, lnr); + rc = lfsck_namespace_handle_striped_master(env, com, lnr); - GOTO(put_dir, rc); + RETURN(rc); } idx = lfsck_find_mdt_idx_by_fid(env, lfsck, &lnr->lnr_fid); if (idx < 0) GOTO(out, rc = idx); - if (idx == lfsck_dev_idx(lfsck->li_bottom)) { + if (idx == lfsck_dev_idx(lfsck)) { if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0)) GOTO(out, rc = 0); @@ -5040,7 +5219,7 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env, GOTO(out, rc); } - ltd = LTD_TGT(&lfsck->li_mdt_descs, idx); + ltd = lfsck_ltd2tgt(&lfsck->li_mdt_descs, idx); if (unlikely(ltd == NULL)) { CDEBUG(D_LFSCK, "%s: cannot talk with MDT %x which " "did not join the namespace LFSCK\n", @@ -5061,6 +5240,15 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env, if (dt_object_exists(obj) == 0) { dangling: + if (dir == NULL) { + dir = lfsck_assistant_object_load(env, lfsck, lso); + if (IS_ERR(dir)) { + rc = PTR_ERR(dir); + + GOTO(trace, rc == -ENOENT ? 0 : rc); + } + } + rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name); if (rc == 0) { if (!lfsck_is_valid_slave_name_entry(env, lnr->lnr_lmv, @@ -5080,7 +5268,7 @@ dangling: GOTO(out, rc); } - if (!(bk->lb_param & LPF_DRYRUN) && repaired) { + if (!(bk->lb_param & LPF_DRYRUN) && lad->lad_advance_lock) { again: rc = lfsck_ibits_lock(env, lfsck, obj, &lh, @@ -5097,7 +5285,7 @@ again: if (rc != 0) GOTO(stop, rc); - rc = dt_trans_start(env, dev, handle); + rc = dt_trans_start_local(env, dev, handle); if (rc != 0) GOTO(stop, rc); @@ -5105,10 +5293,6 @@ again: dtlocked = true; } - rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name); - if (rc != 0) - GOTO(stop, rc); - rc = lfsck_links_read(env, obj, &ldata); if (unlikely(rc == -ENOENT)) { if (handle != NULL) { @@ -5138,13 +5322,12 @@ again: goto stop; } - ns->ln_flags |= LF_INCONSISTENT; - /* If the name entry hash does not match the slave striped * directory, and the name entry does not match also, then * it is quite possible that name entry is corrupted. */ if (!lfsck_is_valid_slave_name_entry(env, lnr->lnr_lmv, lnr->lnr_name, lnr->lnr_namelen)) { + ns->ln_flags |= LF_INCONSISTENT; type = LNIT_BAD_DIRENT; GOTO(stop, rc = 0); @@ -5155,6 +5338,7 @@ again: * not recognize the name entry, then it is quite possible * that the name entry is corrupted. */ if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type) { + ns->ln_flags |= LF_INCONSISTENT; type = LNIT_BAD_DIRENT; GOTO(stop, rc = 0); @@ -5177,7 +5361,6 @@ again: type = LNIT_BAD_TYPE; count = 1; - ns->ln_flags |= LF_INCONSISTENT; /* The magic crashed, we are not sure whether there are more * corrupt data in the linkea, so remove all linkea entries. */ remove = true; @@ -5188,27 +5371,55 @@ again: type = LNIT_BAD_TYPE; count = 1; - ns->ln_flags |= LF_UPGRADE; remove = false; newdata = true; nodata: if (bk->lb_param & LPF_DRYRUN) { + if (rc == -ENODATA) + ns->ln_flags |= LF_UPGRADE; + else + ns->ln_flags |= LF_INCONSISTENT; ns->ln_linkea_repaired++; repaired = true; log = true; goto stop; } - if (!lustre_handle_is_used(&lh)) + if (!lustre_handle_is_used(&lh)) { + remove = false; + newdata = false; + type = LNIT_NONE; + goto again; + } + + LASSERT(handle != NULL); + + if (dir == NULL) { + dir = lfsck_assistant_object_load(env, lfsck, lso); + if (IS_ERR(dir)) { + rc = PTR_ERR(dir); + + GOTO(stop, rc == -ENOENT ? 0 : rc); + } + } + + rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name); + if (rc != 0) + GOTO(stop, rc); + + bad_linkea = true; + if (!remove && newdata) + ns->ln_flags |= LF_UPGRADE; + else if (remove || !(ns->ln_flags & LF_UPGRADE)) + ns->ln_flags |= LF_INCONSISTENT; if (remove) { LASSERT(newdata); - rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle, - BYPASS_CAPA); - if (rc != 0) + rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle); + if (rc != 0 && rc != -ENOENT && rc != -ENODATA) GOTO(stop, rc); } @@ -5220,37 +5431,8 @@ nodata: } rc = linkea_add_buf(&ldata, cname, pfid); - if (rc != 0) - GOTO(stop, rc); - - rc = lfsck_links_write(env, obj, &ldata, handle); - if (unlikely(rc == -ENOSPC) && - S_ISREG(lfsck_object_type(obj)) && !dt_object_remote(obj)) { - if (handle != NULL) { - LASSERT(dt_write_locked(env, obj)); - - dt_write_unlock(env, obj); - dtlocked = false; - - dt_trans_stop(env, dev, handle); - handle = NULL; - - lfsck_ibits_unlock(&lh, LCK_EX); - } - - rc = lfsck_namespace_trace_update(env, com, - &lnr->lnr_fid, LNTF_SKIP_NLINK, true); - if (rc != 0) - /* If we cannot record this object in the - * LFSCK tracing, we have to mark the LFSCK - * as LF_INCOMPLETE, then the LFSCK will - * skip nlink attribute verification for - * all objects. */ - ns->ln_flags |= LF_INCOMPLETE; - - GOTO(out, rc = 0); - } - + if (rc == 0) + rc = lfsck_links_write(env, obj, &ldata, handle); if (rc != 0) GOTO(stop, rc); @@ -5282,13 +5464,31 @@ out: ns->ln_flags |= LF_INCONSISTENT; log = false; + if (dir == NULL) { + dir = lfsck_assistant_object_load(env, lfsck, lso); + if (IS_ERR(dir)) { + rc = PTR_ERR(dir); + + GOTO(trace, rc == -ENOENT ? 0 : rc); + } + } + rc = lfsck_namespace_repair_bad_name_hash(env, com, dir, lnr->lnr_lmv, lnr->lnr_name); - if (rc >= 0) + if (rc == 0) bad_hash = true; } if (rc >= 0) { + if (type != LNIT_NONE && dir == NULL) { + dir = lfsck_assistant_object_load(env, lfsck, lso); + if (IS_ERR(dir)) { + rc = PTR_ERR(dir); + + GOTO(trace, rc == -ENOENT ? 0 : rc); + } + } + switch (type) { case LNIT_BAD_TYPE: log = false; @@ -5314,38 +5514,38 @@ out: break; } - if (count == 1 && S_ISREG(lfsck_object_type(obj))) - dt_attr_get(env, obj, la, BYPASS_CAPA); + if (obj != NULL && count == 1 && + S_ISREG(lfsck_object_type(obj))) + dt_attr_get(env, obj, la); } +trace: down_write(&com->lc_sem); if (rc < 0) { CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle " "the entry: "DFID", parent "DFID", name %.*s: rc = %d\n", - lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid), - PFID(lfsck_dto2fid(dir)), + lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid), PFID(pfid), lnr->lnr_namelen, lnr->lnr_name, rc); lfsck_namespace_record_failure(env, lfsck, ns); if ((rc == -ENOTCONN || rc == -ESHUTDOWN || rc == -EREMCHG || rc == -ETIMEDOUT || rc == -EHOSTDOWN || rc == -EHOSTUNREACH || rc == -EINPROGRESS) && - dev != NULL && dev != lfsck->li_next) + dev != NULL && dev != lfsck->li_bottom) lfsck_lad_set_bitmap(env, com, idx); if (!(bk->lb_param & LPF_FAILOUT)) rc = 0; } else { - if (log) - CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant " - "repaired the entry: "DFID", parent "DFID - ", name %.*s\n", lfsck_lfsck2name(lfsck), - PFID(&lnr->lnr_fid), - PFID(lfsck_dto2fid(dir)), - lnr->lnr_namelen, lnr->lnr_name); - if (repaired) { ns->ln_items_repaired++; + if (log) + CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant " + "repaired the entry: "DFID", parent "DFID + ", name %.*s, type %d\n", + lfsck_lfsck2name(lfsck), + PFID(&lnr->lnr_fid), PFID(pfid), + lnr->lnr_namelen, lnr->lnr_name, type); switch (type) { case LNIT_DANGLING: @@ -5372,8 +5572,17 @@ out: ns->ln_name_hash_repaired++; /* Not count repeatedly. */ - if (!repaired) + if (!repaired) { ns->ln_items_repaired++; + if (log) + CDEBUG(D_LFSCK, "%s: namespace LFSCK " + "assistant repaired the entry: " + DFID", parent "DFID + ", name %.*s\n", + lfsck_lfsck2name(lfsck), + PFID(&lnr->lnr_fid), PFID(pfid), + lnr->lnr_namelen, lnr->lnr_name); + } if (bk->lb_param & LPF_DRYRUN && lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) @@ -5393,8 +5602,10 @@ out: if (obj != NULL && !IS_ERR(obj)) lfsck_object_put(env, obj); -put_dir: - lu_object_put(env, &dir->do_lu); + if (dir != NULL && !IS_ERR(dir)) + lfsck_object_put(env, dir); + + lad->lad_advance_lock = bad_linkea; return rc; } @@ -5456,7 +5667,7 @@ static int lfsck_namespace_scan_local_lpf_one(const struct lu_env *env, obj = com->lc_sub_trace_objs[idx].lsto_obj; fid_cpu_to_be(key, &ent->lde_fid); rc = dt_lookup(env, obj, (struct dt_rec *)&flags, - (const struct dt_key *)key, BYPASS_CAPA); + (const struct dt_key *)key); if (rc == 0) { exist = true; flags |= LNTF_CHECK_ORPHAN; @@ -5510,8 +5721,7 @@ static int lfsck_namespace_scan_local_lpf_one(const struct lu_env *env, GOTO(stop, rc); /* b1. remove name entry from backend /lost+found */ - rc = dt_delete(env, parent, (const struct dt_key *)ent->lde_name, th, - BYPASS_CAPA); + rc = dt_delete(env, parent, (const struct dt_key *)ent->lde_name, th); if (rc != 0) GOTO(stop, rc); @@ -5526,20 +5736,19 @@ static int lfsck_namespace_scan_local_lpf_one(const struct lu_env *env, if (exist) { /* a3. remove child's FID from the LFSCK trace file. */ - rc = dt_delete(env, obj, (const struct dt_key *)key, th, - BYPASS_CAPA); + rc = dt_delete(env, obj, (const struct dt_key *)key, th); if (rc != 0) GOTO(stop, rc); } else { /* b4. set child's ctime as 1 */ - rc = dt_attr_set(env, child, la, th, BYPASS_CAPA); + rc = dt_attr_set(env, child, la, th); if (rc != 0) GOTO(stop, rc); } /* b5. insert child's FID into the LFSCK trace file. */ rc = dt_insert(env, obj, (const struct dt_rec *)&flags, - (const struct dt_key *)key, th, BYPASS_CAPA, 1); + (const struct dt_key *)key, th, 1); GOTO(stop, rc = (rc == 0 ? 1 : rc)); @@ -5547,7 +5756,7 @@ stop: dt_trans_stop(env, dev, th); out: - lu_object_put(env, &child->do_lu); + lfsck_object_put(env, child); return rc; } @@ -5577,19 +5786,19 @@ static void lfsck_namespace_scan_local_lpf(const struct lu_env *env, struct lfsck_instance *lfsck = com->lc_lfsck; struct ptlrpc_thread *thread = &lfsck->li_thread; struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; - struct dt_device *dev = lfsck->li_bottom; struct lfsck_namespace *ns = com->lc_file_ram; struct dt_object *parent; const struct dt_it_ops *iops; struct dt_it *di; - struct seq_server_site *ss = - lu_site2seq(dev->dd_lu_dev.ld_site); + struct seq_server_site *ss = lfsck_dev_site(lfsck); __u64 cookie; + __u32 idx = lfsck_dev_idx(lfsck); int rc = 0; __u16 type; ENTRY; - parent = lfsck_object_find_by_dev(env, dev, &LU_BACKEND_LPF_FID); + parent = lfsck_object_find_by_dev(env, lfsck->li_bottom, + &LU_BACKEND_LPF_FID); if (IS_ERR(parent)) { CERROR("%s: fail to find backend /lost+found: rc = %ld\n", lfsck_lfsck2name(lfsck), PTR_ERR(parent)); @@ -5608,7 +5817,7 @@ static void lfsck_namespace_scan_local_lpf(const struct lu_env *env, com->lc_new_scanned = 0; iops = &parent->do_index_ops->dio_it; - di = iops->init(env, parent, LUDA_64BITHASH | LUDA_TYPE, BYPASS_CAPA); + di = iops->init(env, parent, LUDA_64BITHASH | LUDA_TYPE); if (IS_ERR(di)) GOTO(out, rc = PTR_ERR(di)); @@ -5649,7 +5858,7 @@ static void lfsck_namespace_scan_local_lpf(const struct lu_env *env, fid_seq(&ent->lde_fid), range); if (rc != 0) goto skip; - } else if (lfsck_dev_idx(dev) != 0) { + } else if (idx != 0) { /* If the returned FID is IGIF, then there are three * possible cases: * @@ -5674,8 +5883,7 @@ static void lfsck_namespace_scan_local_lpf(const struct lu_env *env, "in the backend /lost+found on the MDT %04x, " "to be safe, skip it.\n", lfsck_lfsck2name(lfsck), ent->lde_namelen, - ent->lde_name, PFID(&ent->lde_fid), - lfsck_dev_idx(dev)); + ent->lde_name, PFID(&ent->lde_fid), idx); goto skip; } @@ -5715,7 +5923,7 @@ out: CDEBUG(D_LFSCK, "%s: stop to scan backend /lost+found: rc = %d\n", lfsck_lfsck2name(lfsck), rc); - lu_object_put(env, &parent->do_lu); + lfsck_object_put(env, parent); } /** @@ -5752,6 +5960,7 @@ static int lfsck_namespace_rescan_striped_dir(const struct lu_env *env, (struct lu_dirent *)info->lti_key; struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; struct ptlrpc_thread *thread = &lfsck->li_thread; + struct lfsck_assistant_object *lso = NULL; struct lfsck_namespace_req *lnr; struct lfsck_assistant_req *lar; int rc; @@ -5786,7 +5995,20 @@ static int lfsck_namespace_rescan_striped_dir(const struct lu_env *env, if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen)) goto next; - lnr = lfsck_namespace_assistant_req_init(lfsck, ent, type); + if (lso == NULL) { + lso = lfsck_assistant_object_init(env, + lfsck_dto2fid(dir), NULL, + lfsck->li_pos_current.lp_oit_cookie, true); + if (IS_ERR(lso)) { + if (bk->lb_param & LPF_FAILOUT) + GOTO(out, rc = PTR_ERR(lso)); + + lso = NULL; + goto next; + } + } + + lnr = lfsck_namespace_assistant_req_init(lfsck, lso, ent, type); if (IS_ERR(lnr)) { if (bk->lb_param & LPF_FAILOUT) GOTO(out, rc = PTR_ERR(lnr)); @@ -5808,6 +6030,9 @@ next: } while (rc == 0); out: + if (lso != NULL && !IS_ERR(lso)) + lfsck_assistant_object_put(env, lso); + lfsck_close_dir(env, lfsck, rc); if (rc <= 0) RETURN(rc); @@ -5846,7 +6071,7 @@ lfsck_namespace_double_scan_one_trace_file(const struct lu_env *env, __u8 flags = 0; ENTRY; - di = iops->init(env, obj, 0, BYPASS_CAPA); + di = iops->init(env, obj, 0); if (IS_ERR(di)) RETURN(PTR_ERR(di)); @@ -5886,7 +6111,7 @@ lfsck_namespace_double_scan_one_trace_file(const struct lu_env *env, goto checkpoint; } - target = lfsck_object_find_by_dev(env, lfsck->li_bottom, &fid); + target = lfsck_object_find_bottom(env, lfsck, &fid); if (IS_ERR(target)) { rc = PTR_ERR(target); goto checkpoint; @@ -5908,8 +6133,9 @@ checkpoint: down_write(&com->lc_sem); com->lc_new_checked++; com->lc_new_scanned++; - if (rc >= 0 && fid_is_sane(&fid)) + if (rc >= 0) ns->ln_fid_latest_scanned_phase2 = fid; + if (rc > 0) ns->ln_objs_repaired_phase2++; else if (rc < 0) @@ -5929,10 +6155,8 @@ checkpoint: ns->ln_time_last_checkpoint = cfs_time_current_sec(); ns->ln_objs_checked_phase2 += com->lc_new_checked; com->lc_new_checked = 0; - rc = lfsck_namespace_store(env, com, false); + lfsck_namespace_store(env, com); up_write(&com->lc_sem); - if (rc != 0) - GOTO(put, rc); com->lc_time_last_checkpoint = cfs_time_current(); com->lc_time_next_checkpoint = @@ -6012,15 +6236,19 @@ static void lfsck_namespace_assistant_fill_pos(const struct lu_env *env, struct lfsck_assistant_data *lad = com->lc_data; struct lfsck_namespace_req *lnr; + if (((struct lfsck_namespace *)(com->lc_file_ram))->ln_status != + LS_SCANNING_PHASE1) + return; + if (list_empty(&lad->lad_req_list)) return; lnr = list_entry(lad->lad_req_list.next, struct lfsck_namespace_req, lnr_lar.lar_list); - pos->lp_oit_cookie = lnr->lnr_oit_cookie; + pos->lp_oit_cookie = lnr->lnr_lar.lar_parent->lso_oit_cookie; pos->lp_dir_cookie = lnr->lnr_dir_cookie - 1; - pos->lp_dir_parent = lnr->lnr_lar.lar_fid; + pos->lp_dir_parent = lnr->lnr_lar.lar_parent->lso_fid; } static int lfsck_namespace_double_scan_result(const struct lu_env *env, @@ -6032,7 +6260,7 @@ static int lfsck_namespace_double_scan_result(const struct lu_env *env, down_write(&com->lc_sem); ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() + - HALF_SEC - lfsck->li_time_last_checkpoint); + HALF_SEC - com->lc_time_last_checkpoint); ns->ln_time_last_checkpoint = cfs_time_current_sec(); ns->ln_objs_checked_phase2 += com->lc_new_checked; com->lc_new_checked = 0; @@ -6055,7 +6283,7 @@ static int lfsck_namespace_double_scan_result(const struct lu_env *env, ns->ln_status = LS_FAILED; } - rc = lfsck_namespace_store(env, com, false); + rc = lfsck_namespace_store(env, com); up_write(&com->lc_sem); return rc; @@ -6126,7 +6354,7 @@ static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env, down_read(<ds->ltd_rw_sem); cfs_foreach_bit(lad->lad_bitmap, idx) { - ltd = LTD_TGT(ltds, idx); + ltd = lfsck_ltd2tgt(ltds, idx); LASSERT(ltd != NULL); laia->laia_ltd = ltd; @@ -6170,7 +6398,6 @@ struct lfsck_assistant_operations lfsck_namespace_assistant_ops = { * entries, then re-generate the linkEA with the given information. * * \param[in] env pointer to the thread context - * \param[in] dev pointer to the dt_device * \param[in] obj pointer to the dt_object to be handled * \param[in] cname the name for the child in the parent directory * \param[in] pfid the parent directory's FID for the linkEA @@ -6178,10 +6405,10 @@ struct lfsck_assistant_operations lfsck_namespace_assistant_ops = { * \retval 0 for success * \retval negative error number on failure */ -int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev, - struct dt_object *obj, const struct lu_name *cname, - const struct lu_fid *pfid) +int lfsck_verify_linkea(const struct lu_env *env, struct dt_object *obj, + const struct lu_name *cname, const struct lu_fid *pfid) { + struct dt_device *dev = lfsck_obj2dev(obj); struct linkea_data ldata = { NULL }; struct lu_buf linkea_buf; struct thandle *th; @@ -6192,7 +6419,7 @@ int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev, LASSERT(S_ISDIR(lfsck_object_type(obj))); - rc = lfsck_links_read(env, obj, &ldata); + rc = lfsck_links_read_with_rec(env, obj, &ldata); if (rc == -ENODATA) { dirty = true; } else if (rc == 0) { @@ -6209,11 +6436,8 @@ int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev, if (!dirty) RETURN(rc); - rc = linkea_data_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf); - if (rc != 0) - RETURN(rc); - - rc = linkea_add_buf(&ldata, cname, pfid); + rc = linkea_links_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf, + cname, pfid); if (rc != 0) RETURN(rc); @@ -6234,7 +6458,7 @@ int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev, dt_write_lock(env, obj, 0); rc = dt_xattr_set(env, obj, &linkea_buf, - XATTR_NAME_LINK, fl, th, BYPASS_CAPA); + XATTR_NAME_LINK, fl, th); dt_write_unlock(env, obj); GOTO(stop, rc); @@ -6264,14 +6488,11 @@ int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj, struct linkea_data ldata = { NULL }; int rc; - rc = lfsck_links_read(env, obj, &ldata); - if (rc != 0) + rc = lfsck_links_read_with_rec(env, obj, &ldata); + if (rc) return rc; linkea_first_entry(&ldata); - if (ldata.ld_lee == NULL) - return -ENODATA; - linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid); /* To guarantee the 'name' is terminated with '0'. */ memcpy(name, cname->ln_name, cname->ln_namelen); @@ -6285,10 +6506,10 @@ int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj, * * \param[in] env pointer to the thread context * \param[in] lfsck pointer to the lfsck instance - * \param[in] parent pointer to the parent directory that holds + * \param[in] dir pointer to the directory that holds * the name entry * \param[in] name the name for the entry to be updated - * \param[in] pfid the new PFID for the name entry + * \param[in] fid the new FID for the name entry referenced * \param[in] type the type for the name entry to be updated * * \retval 0 for success @@ -6296,19 +6517,20 @@ int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj, */ int lfsck_update_name_entry(const struct lu_env *env, struct lfsck_instance *lfsck, - struct dt_object *parent, const char *name, - const struct lu_fid *pfid, __u32 type) + struct dt_object *dir, const char *name, + const struct lu_fid *fid, __u32 type) { - struct dt_insert_rec *rec = &lfsck_env_info(env)->lti_dt_rec; - struct dt_device *dev = lfsck->li_next; - struct lustre_handle lh = { 0 }; - struct thandle *th; - int rc; - bool exists = true; + struct lfsck_thread_info *info = lfsck_env_info(env); + struct dt_insert_rec *rec = &info->lti_dt_rec; + struct lfsck_lock_handle *llh = &info->lti_llh; + struct dt_device *dev = lfsck_obj2dev(dir); + struct thandle *th; + int rc; + bool exists = true; ENTRY; - rc = lfsck_ibits_lock(env, lfsck, parent, &lh, - MDS_INODELOCK_UPDATE, LCK_EX); + rc = lfsck_lock(env, lfsck, dir, name, llh, + MDS_INODELOCK_UPDATE, LCK_PW); if (rc != 0) RETURN(rc); @@ -6316,27 +6538,26 @@ int lfsck_update_name_entry(const struct lu_env *env, if (IS_ERR(th)) GOTO(unlock, rc = PTR_ERR(th)); - rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th); + rc = dt_declare_delete(env, dir, (const struct dt_key *)name, th); if (rc != 0) GOTO(stop, rc); rec->rec_type = type; - rec->rec_fid = pfid; - rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec, + rec->rec_fid = fid; + rc = dt_declare_insert(env, dir, (const struct dt_rec *)rec, (const struct dt_key *)name, th); if (rc != 0) GOTO(stop, rc); - rc = dt_declare_ref_add(env, parent, th); + rc = dt_declare_ref_add(env, dir, th); if (rc != 0) GOTO(stop, rc); - rc = dt_trans_start(env, dev, th); + rc = dt_trans_start_local(env, dev, th); if (rc != 0) GOTO(stop, rc); - rc = dt_delete(env, parent, (const struct dt_key *)name, th, - BYPASS_CAPA); + rc = dt_delete(env, dir, (const struct dt_key *)name, th); if (rc == -ENOENT) { exists = false; rc = 0; @@ -6345,12 +6566,12 @@ int lfsck_update_name_entry(const struct lu_env *env, if (rc != 0) GOTO(stop, rc); - rc = dt_insert(env, parent, (const struct dt_rec *)rec, - (const struct dt_key *)name, th, BYPASS_CAPA, 1); + rc = dt_insert(env, dir, (const struct dt_rec *)rec, + (const struct dt_key *)name, th, 1); if (rc == 0 && S_ISDIR(type) && !exists) { - dt_write_lock(env, parent, 0); - rc = dt_ref_add(env, parent, th); - dt_write_unlock(env, parent); + dt_write_lock(env, dir, 0); + rc = dt_ref_add(env, dir, th); + dt_write_unlock(env, dir); } GOTO(stop, rc); @@ -6359,11 +6580,10 @@ stop: dt_trans_stop(env, dev, th); unlock: - lfsck_ibits_unlock(&lh, LCK_EX); - + lfsck_unlock(llh); CDEBUG(D_LFSCK, "%s: update name entry "DFID"/%s with the FID "DFID " and the type %o: rc = %d\n", lfsck_lfsck2name(lfsck), - PFID(lfsck_dto2fid(parent)), name, PFID(pfid), type, rc); + PFID(lfsck_dto2fid(dir)), name, PFID(fid), type, rc); return rc; } @@ -6417,9 +6637,10 @@ int lfsck_namespace_setup(const struct lu_env *env, if (unlikely(!dt_try_as_dir(env, root))) GOTO(out, rc = -ENOTDIR); - obj = local_file_find_or_create(env, lfsck->li_los, root, - LFSCK_NAMESPACE, - S_IFREG | S_IRUGO | S_IWUSR); + obj = local_index_find_or_create(env, lfsck->li_los, root, + LFSCK_NAMESPACE, + S_IFREG | S_IRUGO | S_IWUSR, + &dt_lfsck_namespace_features); if (IS_ERR(obj)) GOTO(out, rc = PTR_ERR(obj)); @@ -6430,7 +6651,8 @@ int lfsck_namespace_setup(const struct lu_env *env, else if (rc < 0) rc = lfsck_namespace_reset(env, com, true); else - rc = lfsck_namespace_load_sub_trace_files(env, com, false); + rc = lfsck_load_sub_trace_files(env, com, + &dt_lfsck_namespace_features, LFSCK_NAMESPACE, false); if (rc != 0) GOTO(out, rc); @@ -6468,7 +6690,7 @@ int lfsck_namespace_setup(const struct lu_env *env, out: if (root != NULL && !IS_ERR(root)) - lu_object_put(env, &root->do_lu); + lfsck_object_put(env, root); if (rc != 0) { lfsck_component_cleanup(env, com); CERROR("%s: fail to init namespace LFSCK component: rc = %d\n",