X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flfsck%2Flfsck_namespace.c;h=6e29717e5c0ffe32c25381cffb76c45d49e89079;hp=f4308bc02d18e753c84b83312f4c7b3ed27b545f;hb=4d408c9aed9adaf1f4e2ea87851728a1cf662594;hpb=44888417ecbf09fc6f294311dd98914aefda05c4 diff --git a/lustre/lfsck/lfsck_namespace.c b/lustre/lfsck/lfsck_namespace.c index f4308bc..6e29717 100644 --- a/lustre/lfsck/lfsck_namespace.c +++ b/lustre/lfsck/lfsck_namespace.c @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2012, 2013, Intel Corporation. + * Copyright (c) 2013, 2014, Intel Corporation. */ /* * lustre/lfsck/lfsck_namespace.c @@ -41,7 +41,15 @@ #include "lfsck_internal.h" -#define LFSCK_NAMESPACE_MAGIC 0xA0629D03 +#define LFSCK_NAMESPACE_MAGIC_V1 0xA0629D03 +#define LFSCK_NAMESPACE_MAGIC_V2 0xA0621A0B + +/* For Lustre-2.x (x <= 6), the namespace LFSCK used LFSCK_NAMESPACE_MAGIC_V1 + * as the trace file magic. When downgrade to such old release, the old LFSCK + * will not recognize the new LFSCK_NAMESPACE_MAGIC_V2 in the new trace file, + * then it will reset the whole LFSCK, and will not cause start failure. The + * similar case will happen when upgrade from such old release. */ +#define LFSCK_NAMESPACE_MAGIC LFSCK_NAMESPACE_MAGIC_V2 enum lfsck_nameentry_check { LFSCK_NAMEENTRY_DEAD = 1, /* The object has been unlinked. */ @@ -49,8 +57,6 @@ enum lfsck_nameentry_check { LFSCK_NAMEENTRY_RECREATED = 3, /* The entry has been recreated. */ }; -static const char lfsck_namespace_name[] = "lfsck_namespace"; - static struct lfsck_namespace_req * lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck, struct lu_dirent *ent, __u16 type) @@ -319,9 +325,19 @@ static int lfsck_namespace_load_bitmap(const struct lu_env *env, } /** - * \retval +ve: the lfsck_namespace is broken, the caller should reset it. - * \retval 0: succeed. - * \retval -ve: failed cases. + * Load namespace LFSCK statistics information from the trace file. + * + * For old release (Lustre-2.6 or older), the statistics information was + * stored as XATTR_NAME_LFSCK_NAMESPACE_OLD EA. But in Lustre-2.7, we need + * more statistics information. To avoid confusing old MDT when downgrade, + * Lustre-2.7 stores the namespace LFSCK statistics information as new + * XATTR_NAME_LFSCK_NAMESPACE EA. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * + * \retval 0 for success + * \retval negative error number on failure */ static int lfsck_namespace_load(const struct lu_env *env, struct lfsck_component *com) @@ -341,7 +357,7 @@ static int lfsck_namespace_load(const struct lu_env *env, CDEBUG(D_LFSCK, "%s: invalid lfsck_namespace magic " "%#x != %#x\n", lfsck_lfsck2name(com->lc_lfsck), ns->ln_magic, LFSCK_NAMESPACE_MAGIC); - rc = 1; + rc = -ESTALE; } else { rc = 0; } @@ -350,13 +366,22 @@ static int lfsck_namespace_load(const struct lu_env *env, "expected = %d: rc = %d\n", lfsck_lfsck2name(com->lc_lfsck), len, rc); if (rc >= 0) - rc = 1; + rc = -ESTALE; + } else { + /* Check whether it is old trace file or not. + * If yes, it should be reset via returning -ESTALE. */ + rc = dt_xattr_get(env, com->lc_obj, + lfsck_buf_get(env, com->lc_file_disk, len), + XATTR_NAME_LFSCK_NAMESPACE_OLD, BYPASS_CAPA); + if (rc >= 0) + rc = -ESTALE; } + return rc; } static int lfsck_namespace_store(const struct lu_env *env, - struct lfsck_component *com) + struct lfsck_component *com, bool init) { struct dt_object *obj = com->lc_obj; struct lfsck_instance *lfsck = com->lc_lfsck; @@ -367,6 +392,9 @@ static int lfsck_namespace_store(const struct lu_env *env, __u32 nbits = 0; int len = com->lc_file_size; int rc; +#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 8, 53, 0) + struct lu_buf tbuf = { &len, sizeof(len) }; +#endif ENTRY; if (lad != NULL) { @@ -398,6 +426,20 @@ static int lfsck_namespace_store(const struct lu_env *env, GOTO(out, rc); } +#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 8, 53, 0) + /* To be compatible with old Lustre-2.x MDT (x <= 6), generate dummy + * XATTR_NAME_LFSCK_NAMESPACE_OLD EA, then when downgrade to Lustre-2.x, + * the old LFSCK will find "invalid" XATTR_NAME_LFSCK_NAMESPACE_OLD EA, + * then reset the namespace LFSCK trace file. */ + if (init) { + rc = dt_declare_xattr_set(env, obj, &tbuf, + XATTR_NAME_LFSCK_NAMESPACE_OLD, + LU_XATTR_CREATE, handle); + if (rc != 0) + GOTO(out, rc); + } +#endif + rc = dt_trans_start_local(env, lfsck->li_bottom, handle); if (rc != 0) GOTO(out, rc); @@ -411,6 +453,13 @@ static int lfsck_namespace_store(const struct lu_env *env, XATTR_NAME_LFSCK_BITMAP, 0, handle, BYPASS_CAPA); +#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 8, 53, 0) + if (rc == 0 && init) + rc = dt_xattr_set(env, obj, &tbuf, + XATTR_NAME_LFSCK_NAMESPACE_OLD, + LU_XATTR_CREATE, handle, BYPASS_CAPA); +#endif + GOTO(out, rc); out: @@ -423,6 +472,70 @@ log: return rc; } +static struct dt_object * +lfsck_namespace_load_one_trace_file(const struct lu_env *env, + struct lfsck_component *com, + struct dt_object *parent, + const char *name, + const struct dt_index_features *ft, + bool reset) +{ + struct lfsck_instance *lfsck = com->lc_lfsck; + struct dt_object *obj; + int rc; + + if (reset) { + rc = local_object_unlink(env, lfsck->li_bottom, parent, name); + if (rc != 0 && rc != -ENOENT) + return ERR_PTR(rc); + } + + if (ft != NULL) + obj = local_index_find_or_create(env, lfsck->li_los, parent, + name, S_IFREG | S_IRUGO | S_IWUSR, ft); + else + obj = local_file_find_or_create(env, lfsck->li_los, parent, + name, S_IFREG | S_IRUGO | S_IWUSR); + + return obj; +} + +static int lfsck_namespace_load_sub_trace_files(const struct lu_env *env, + struct lfsck_component *com, + bool reset) +{ + char *name = lfsck_env_info(env)->lti_key; + struct lfsck_sub_trace_obj *lsto; + struct dt_object *obj; + int rc; + int i; + + for (i = 0, lsto = &com->lc_sub_trace_objs[0]; + i < LFSCK_STF_COUNT; i++, lsto++) { + snprintf(name, NAME_MAX, "%s_%02d", LFSCK_NAMESPACE, i); + if (lsto->lsto_obj != NULL) { + if (!reset) + continue; + + lu_object_put(env, &lsto->lsto_obj->do_lu); + lsto->lsto_obj = NULL; + } + + obj = lfsck_namespace_load_one_trace_file(env, com, + com->lc_lfsck->li_lfsck_dir, + name, &dt_lfsck_features, reset); + if (IS_ERR(obj)) + return PTR_ERR(obj); + + lsto->lsto_obj = obj; + rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features); + if (rc != 0) + return rc; + } + + return 0; +} + static int lfsck_namespace_init(const struct lu_env *env, struct lfsck_component *com) { @@ -433,8 +546,11 @@ static int lfsck_namespace_init(const struct lu_env *env, ns->ln_magic = LFSCK_NAMESPACE_MAGIC; ns->ln_status = LS_INIT; down_write(&com->lc_sem); - rc = lfsck_namespace_store(env, com); + rc = lfsck_namespace_store(env, com, true); up_write(&com->lc_sem); + if (rc == 0) + rc = lfsck_namespace_load_sub_trace_files(env, com, true); + return rc; } @@ -456,10 +572,11 @@ int lfsck_namespace_trace_update(const struct lu_env *env, const __u8 flags, bool add) { struct lfsck_instance *lfsck = com->lc_lfsck; - struct dt_object *obj = com->lc_obj; + struct dt_object *obj; struct lu_fid *key = &lfsck_env_info(env)->lti_fid3; struct dt_device *dev = lfsck->li_bottom; struct thandle *th = NULL; + int idx; int rc = 0; __u8 old = 0; __u8 new = 0; @@ -467,7 +584,12 @@ int lfsck_namespace_trace_update(const struct lu_env *env, LASSERT(flags != 0); - down_write(&com->lc_sem); + if (unlikely(!fid_is_sane(fid))) + RETURN(0); + + idx = lfsck_sub_trace_file_fid2idx(fid); + obj = com->lc_sub_trace_objs[idx].lsto_obj; + mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex); fid_cpu_to_be(key, fid); rc = dt_lookup(env, obj, (struct dt_rec *)&old, (const struct dt_key *)key, BYPASS_CAPA); @@ -542,14 +664,14 @@ log: (__u32)flags, (__u32)old, (__u32)new, rc); unlock: - up_write(&com->lc_sem); + mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex); return rc; } -static int lfsck_namespace_check_exist(const struct lu_env *env, - struct dt_object *dir, - struct dt_object *obj, const char *name) +int lfsck_namespace_check_exist(const struct lu_env *env, + struct dt_object *dir, + struct dt_object *obj, const char *name) { struct lu_fid *fid = &lfsck_env_info(env)->lti_fid; int rc; @@ -792,17 +914,17 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env, struct lfsck_thread_info *info = lfsck_env_info(env); struct lu_name *cname = &info->lti_name; struct dt_insert_rec *rec = &info->lti_dt_rec; - struct lu_fid *tfid = &info->lti_fid5; struct lu_attr *la = &info->lti_la3; const struct lu_fid *cfid = lfsck_dto2fid(orphan); const struct lu_fid *pfid; + struct lu_fid tfid; struct lfsck_instance *lfsck = com->lc_lfsck; struct dt_device *dev = lfsck->li_bottom; struct dt_object *parent; struct thandle *th = NULL; struct lustre_handle plh = { 0 }; struct lustre_handle clh = { 0 }; - struct linkea_data ldata = { 0 }; + struct linkea_data ldata = { NULL }; struct lu_buf linkea_buf; int namelen; int idx = 0; @@ -811,12 +933,8 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env, ENTRY; cname->ln_name = NULL; - /* Create .lustre/lost+found/MDTxxxx when needed. */ - if (unlikely(lfsck->li_lpf_obj == NULL)) { - rc = lfsck_create_lpf(env, lfsck); - if (rc != 0) - GOTO(log, rc); - } + if (unlikely(lfsck->li_lpf_obj == NULL)) + GOTO(log, rc = -ENXIO); parent = lfsck->li_lpf_obj; pfid = lfsck_dto2fid(parent); @@ -830,13 +948,13 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env, do { namelen = snprintf(info->lti_key, NAME_MAX, DFID"%s-%s-%d", PFID(cfid), infix, type, idx++); - rc = dt_lookup(env, parent, (struct dt_rec *)tfid, + rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, (const struct dt_key *)info->lti_key, BYPASS_CAPA); if (rc != 0 && rc != -ENOENT) GOTO(log, rc); - if (unlikely(rc == 0 && lu_fid_eq(cfid, tfid))) + if (unlikely(rc == 0 && lu_fid_eq(cfid, &tfid))) exist = true; } while (rc == 0 && !exist); @@ -954,10 +1072,8 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env, } } - if (rc != 0) - GOTO(unlock, rc); - - rc = dt_attr_set(env, orphan, la, th, BYPASS_CAPA); + if (rc == 0) + rc = dt_attr_set(env, orphan, la, th, BYPASS_CAPA); GOTO(stop, rc = (rc == 0 ? 1 : rc)); @@ -1113,116 +1229,7 @@ log: } /** - * Create the specified orphan MDT-object on remote MDT. - * - * The LFSCK instance on this MDT will send LFSCK RPC to remote MDT to - * ask the remote LFSCK instance to create the specified orphan object - * under .lustre/lost+found/MDTxxxx/ directory with the name: - * ${FID}-P-${conflict_version}. - * - * \param[in] env pointer to the thread context - * \param[in] com pointer to the lfsck component - * \param[in] orphan pointer to the orphan MDT-object - * \param[in] type the orphan's type to be created - * - * type "P": The orphan object to be created was a parent directory - * of some MDT-object which linkEA shows that the @orphan - * object is missing. - * - * \see lfsck_layout_recreate_parent() for more types. - * - * \param[in] lmv pointer to master LMV EA that will be set to the orphan - * - * \retval positive number for repaired cases - * \retval 0 if needs to repair nothing - * \retval negative error number on failure - */ -static int lfsck_namespace_create_orphan_remote(const struct lu_env *env, - struct lfsck_component *com, - struct dt_object *orphan, - __u32 type, - struct lmv_mds_md_v1 *lmv) -{ - struct lfsck_thread_info *info = lfsck_env_info(env); - struct lfsck_request *lr = &info->lti_lr; - struct lu_seq_range *range = &info->lti_range; - const struct lu_fid *fid = lfsck_dto2fid(orphan); - struct lfsck_namespace *ns = com->lc_file_ram; - struct lfsck_instance *lfsck = com->lc_lfsck; - struct seq_server_site *ss = - lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site); - struct lfsck_tgt_desc *ltd = NULL; - struct ptlrpc_request *req = NULL; - int rc; - ENTRY; - - if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) - GOTO(out, rc = 1); - - fld_range_set_mdt(range); - rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range); - if (rc != 0) - GOTO(out, rc); - - ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, range->lsr_index); - if (ltd == NULL) { - ns->ln_flags |= LF_INCOMPLETE; - - GOTO(out, rc = -ENODEV); - } - - req = ptlrpc_request_alloc(class_exp2cliimp(ltd->ltd_exp), - &RQF_LFSCK_NOTIFY); - if (req == NULL) - GOTO(out, rc = -ENOMEM); - - rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY); - if (rc != 0) { - ptlrpc_request_free(req); - - GOTO(out, rc); - } - - lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST); - memset(lr, 0, sizeof(*lr)); - lr->lr_event = LE_CREATE_ORPHAN; - lr->lr_index = lfsck_dev_idx(lfsck->li_bottom); - lr->lr_active = LFSCK_TYPE_NAMESPACE; - lr->lr_fid = *fid; - lr->lr_type = type; - if (lmv != NULL) { - lr->lr_hash_type = lmv->lmv_hash_type; - lr->lr_stripe_count = lmv->lmv_stripe_count; - lr->lr_layout_version = lmv->lmv_layout_version; - memcpy(lr->lr_pool_name, lmv->lmv_pool_name, - sizeof(lr->lr_pool_name)); - } - - ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - ptlrpc_req_finished(req); - - if (rc == 0) - rc = 1; - else if (rc == -EEXIST) - rc = 0; - - GOTO(out, rc); - -out: - CDEBUG(D_LFSCK, "%s: namespace LFSCK create object " - DFID" on the MDT %x remotely: rc = %d\n", - lfsck_lfsck2name(lfsck), PFID(fid), - ltd != NULL ? ltd->ltd_index : -1, rc); - - if (ltd != NULL) - lfsck_tgt_put(ltd); - - return rc; -} - -/** - * Create the specified orphan MDT-object locally. + * Create the specified orphan directory. * * For the case that the parent MDT-object stored in some MDT-object's * linkEA entry is lost, the LFSCK will re-create the parent object as @@ -1232,24 +1239,15 @@ out: * \param[in] env pointer to the thread context * \param[in] com pointer to the lfsck component * \param[in] orphan pointer to the orphan MDT-object to be created - * \param[in] type the orphan's type to be created - * - * type "P": The orphan object to be created was a parent directory - * of some MDT-object which linkEA shows that the @orphan - * object is missing. - * - * \see lfsck_layout_recreate_parent() for more types. - * * \param[in] lmv pointer to master LMV EA that will be set to the orphan * * \retval positive number for repaired cases * \retval negative error number on failure */ -static int lfsck_namespace_create_orphan_local(const struct lu_env *env, - struct lfsck_component *com, - struct dt_object *orphan, - __u32 type, - struct lmv_mds_md_v1 *lmv) +static int lfsck_namespace_create_orphan_dir(const struct lu_env *env, + struct lfsck_component *com, + struct dt_object *orphan, + struct lmv_mds_md_v1 *lmv) { struct lfsck_thread_info *info = lfsck_env_info(env); struct lu_attr *la = &info->lti_la; @@ -1257,17 +1255,17 @@ static int lfsck_namespace_create_orphan_local(const struct lu_env *env, struct dt_object_format *dof = &info->lti_dof; struct lu_name *cname = &info->lti_name2; struct dt_insert_rec *rec = &info->lti_dt_rec; - struct lu_fid *tfid = &info->lti_fid; struct lmv_mds_md_v1 *lmv2 = &info->lti_lmv2; const struct lu_fid *cfid = lfsck_dto2fid(orphan); - const struct lu_fid *pfid; + struct lu_fid tfid; struct lfsck_instance *lfsck = com->lc_lfsck; - struct dt_device *dev = lfsck->li_bottom; + struct lfsck_namespace *ns = com->lc_file_ram; + struct dt_device *dev; struct dt_object *parent = NULL; struct dt_object *child = NULL; struct thandle *th = NULL; struct lustre_handle lh = { 0 }; - struct linkea_data ldata = { 0 }; + struct linkea_data ldata = { NULL }; struct lu_buf linkea_buf; struct lu_buf lmv_buf; char name[32]; @@ -1277,26 +1275,45 @@ static int lfsck_namespace_create_orphan_local(const struct lu_env *env, ENTRY; LASSERT(!dt_object_exists(orphan)); - LASSERT(!dt_object_remote(orphan)); - - /* @orphan maybe not attached to lfsck->li_bottom */ - child = lfsck_object_find_by_dev(env, dev, cfid); - if (IS_ERR(child)) - GOTO(log, rc = PTR_ERR(child)); cname->ln_name = NULL; if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) GOTO(log, rc = 1); - /* Create .lustre/lost+found/MDTxxxx when needed. */ - if (unlikely(lfsck->li_lpf_obj == NULL)) { - rc = lfsck_create_lpf(env, lfsck); + if (dt_object_remote(orphan)) { + LASSERT(lfsck->li_lpf_root_obj != NULL); + + idx = lfsck_find_mdt_idx_by_fid(env, lfsck, cfid); + if (idx < 0) + GOTO(log, rc = idx); + + snprintf(name, 8, "MDT%04x", idx); + rc = dt_lookup(env, lfsck->li_lpf_root_obj, + (struct dt_rec *)&tfid, + (const struct dt_key *)name, BYPASS_CAPA); if (rc != 0) - GOTO(log, rc); + GOTO(log, rc = (rc == -ENOENT ? -ENXIO : rc)); + + parent = lfsck_object_find_bottom(env, lfsck, &tfid); + if (IS_ERR(parent)) + GOTO(log, rc = PTR_ERR(parent)); + + if (unlikely(!dt_try_as_dir(env, parent))) + GOTO(log, rc = -ENOTDIR); + } else { + if (unlikely(lfsck->li_lpf_obj == NULL)) + GOTO(log, rc = -ENXIO); + + parent = lfsck->li_lpf_obj; } - parent = lfsck->li_lpf_obj; - pfid = lfsck_dto2fid(parent); + dev = lfsck_find_dev_by_fid(env, lfsck, cfid); + if (IS_ERR(dev)) + GOTO(log, rc = PTR_ERR(dev)); + + child = lfsck_object_find_by_dev(env, dev, cfid); + if (IS_ERR(child)) + GOTO(log, rc = PTR_ERR(child)); /* Hold update lock on the parent to prevent others to access. */ rc = lfsck_ibits_lock(env, lfsck, parent, &lh, @@ -1304,10 +1321,11 @@ static int lfsck_namespace_create_orphan_local(const struct lu_env *env, if (rc != 0) GOTO(log, rc); + idx = 0; do { namelen = snprintf(name, 31, DFID"-P-%d", PFID(cfid), idx++); - rc = dt_lookup(env, parent, (struct dt_rec *)tfid, + rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, (const struct dt_key *)name, BYPASS_CAPA); if (rc != 0 && rc != -ENOENT) GOTO(unlock1, rc); @@ -1317,7 +1335,7 @@ static int lfsck_namespace_create_orphan_local(const struct lu_env *env, cname->ln_namelen = namelen; memset(la, 0, sizeof(*la)); - la->la_mode = type | (S_ISDIR(type) ? 0700 : 0600); + la->la_mode = S_IFDIR | 0700; la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID | LA_ATIME | LA_MTIME | LA_CTIME; @@ -1325,13 +1343,13 @@ static int lfsck_namespace_create_orphan_local(const struct lu_env *env, la->la_mode & S_IFMT); memset(dof, 0, sizeof(*dof)); - dof->dof_type = dt_mode_to_dft(type); + dof->dof_type = dt_mode_to_dft(S_IFDIR); rc = linkea_data_new(&ldata, &info->lti_linkea_buf2); if (rc != 0) GOTO(unlock1, rc); - rc = linkea_add_buf(&ldata, cname, pfid); + rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent)); if (rc != 0) GOTO(unlock1, rc); @@ -1339,13 +1357,38 @@ static int lfsck_namespace_create_orphan_local(const struct lu_env *env, if (IS_ERR(th)) GOTO(unlock1, rc = PTR_ERR(th)); + /* Sync the remote transaction to guarantee that the subsequent + * lock against the @orphan can find the @orphan in time. */ + if (dt_object_remote(orphan)) + th->th_sync = 1; + rc = dt_declare_create(env, child, la, hint, dof, th); - if (rc == 0 && S_ISDIR(type)) + if (rc != 0) + GOTO(stop, rc); + + if (unlikely(!dt_try_as_dir(env, child))) + GOTO(stop, rc = -ENOTDIR); + + rec->rec_type = S_IFDIR; + rec->rec_fid = cfid; + rc = dt_declare_insert(env, child, (const struct dt_rec *)rec, + (const struct dt_key *)dot, th); + if (rc != 0) + GOTO(stop, rc); + + rec->rec_fid = lfsck_dto2fid(parent); + rc = dt_declare_insert(env, child, (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, th); + if (rc == 0) rc = dt_declare_ref_add(env, child, th); if (rc != 0) GOTO(stop, rc); + rc = dt_declare_ref_add(env, child, th); + if (rc != 0) + GOTO(stop, rc); + if (lmv != NULL) { lmv->lmv_magic = LMV_MAGIC; lmv->lmv_master_mdt_index = lfsck_dev_idx(dev); @@ -1364,11 +1407,10 @@ static int lfsck_namespace_create_orphan_local(const struct lu_env *env, if (rc != 0) GOTO(stop, rc); - rec->rec_type = type; rec->rec_fid = cfid; rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec, (const struct dt_key *)name, th); - if (rc == 0 && S_ISDIR(type)) + if (rc == 0) rc = dt_declare_ref_add(env, parent, th); if (rc != 0) @@ -1383,28 +1425,22 @@ static int lfsck_namespace_create_orphan_local(const struct lu_env *env, if (rc != 0) GOTO(unlock2, rc); - if (S_ISDIR(type)) { - if (unlikely(!dt_try_as_dir(env, child))) - GOTO(unlock2, rc = -ENOTDIR); - - rec->rec_type = S_IFDIR; - rec->rec_fid = cfid; - rc = dt_insert(env, child, (const struct dt_rec *)rec, - (const struct dt_key *)dot, th, BYPASS_CAPA, 1); - if (rc != 0) - GOTO(unlock2, rc); + rec->rec_fid = cfid; + rc = dt_insert(env, child, (const struct dt_rec *)rec, + (const struct dt_key *)dot, th, BYPASS_CAPA, 1); + if (rc != 0) + GOTO(unlock2, rc); - rec->rec_fid = pfid; - rc = dt_insert(env, child, (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th, - BYPASS_CAPA, 1); - if (rc != 0) - GOTO(unlock2, rc); + rec->rec_fid = lfsck_dto2fid(parent); + rc = dt_insert(env, child, (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, th, + BYPASS_CAPA, 1); + if (rc != 0) + GOTO(unlock2, rc); - rc = dt_ref_add(env, child, th); - if (rc != 0) - GOTO(unlock2, rc); - } + rc = dt_ref_add(env, child, th); + if (rc != 0) + GOTO(unlock2, rc); if (lmv != NULL) { rc = dt_xattr_set(env, child, &lmv_buf, XATTR_NAME_LMV, 0, @@ -1419,11 +1455,10 @@ static int lfsck_namespace_create_orphan_local(const struct lu_env *env, if (rc != 0) GOTO(stop, rc); - rec->rec_type = type; rec->rec_fid = cfid; rc = dt_insert(env, parent, (const struct dt_rec *)rec, (const struct dt_key *)name, th, BYPASS_CAPA, 1); - if (rc == 0 && S_ISDIR(type)) { + if (rc == 0) { dt_write_lock(env, parent, 0); rc = dt_ref_add(env, parent, th); dt_write_unlock(env, parent); @@ -1441,55 +1476,16 @@ unlock1: lfsck_ibits_unlock(&lh, LCK_EX); log: - CDEBUG(D_LFSCK, "%s: namespace LFSCK create orphan locally for " - "the object "DFID", name = %s, type %o: rc = %d\n", + CDEBUG(D_LFSCK, "%s: namespace LFSCK create orphan dir for " + "the object "DFID", name = %s: rc = %d\n", lfsck_lfsck2name(lfsck), PFID(cfid), - cname->ln_name != NULL ? cname->ln_name : "", type, rc); + cname->ln_name != NULL ? cname->ln_name : "", rc); if (child != NULL && !IS_ERR(child)) lfsck_object_put(env, child); - return rc; -} - -/** - * Create the specified orphan MDT-object. - * - * For the case that the parent MDT-object stored in some MDT-object's - * linkEA entry is lost, the LFSCK will re-create the parent object as - * an orphan and insert it into .lustre/lost+found/MDTxxxx/ directory - * with the name: ${FID}-P-${conflict_version}. - * - * \param[in] env pointer to the thread context - * \param[in] com pointer to the lfsck component - * \param[in] orphan pointer to the orphan MDT-object - * - * type "P": The orphan object to be created was a parent directory - * of some MDT-object which linkEA shows that the @orphan - * object is missing. - * - * \see lfsck_layout_recreate_parent() for more types. - * - * \param[in] lmv pointer to master LMV EA that will be set to the orphan - * - * \retval positive number for repaired cases - * \retval 0 if needs to repair nothing - * \retval negative error number on failure - */ -static int lfsck_namespace_create_orphan(const struct lu_env *env, - struct lfsck_component *com, - struct dt_object *orphan, - struct lmv_mds_md_v1 *lmv) -{ - struct lfsck_namespace *ns = com->lc_file_ram; - int rc; - - if (dt_object_remote(orphan)) - rc = lfsck_namespace_create_orphan_remote(env, com, orphan, - S_IFDIR, lmv); - else - rc = lfsck_namespace_create_orphan_local(env, com, orphan, - S_IFDIR, lmv); + if (parent != NULL && !IS_ERR(parent) && parent != lfsck->li_lpf_obj) + lfsck_object_put(env, parent); if (rc != 0) ns->ln_flags |= LF_INCONSISTENT; @@ -1530,7 +1526,7 @@ static int lfsck_namespace_shrink_linkea(const struct lu_env *env, struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; struct thandle *th = NULL; struct lustre_handle lh = { 0 }; - struct linkea_data ldata_new = { 0 }; + struct linkea_data ldata_new = { NULL }; struct lu_buf linkea_buf; int rc = 0; ENTRY; @@ -1739,16 +1735,16 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env, const struct lu_name *cname) { struct lfsck_thread_info *info = lfsck_env_info(env); - struct lu_fid *tfid = &info->lti_fid5; struct lu_attr *la = &info->lti_la; struct dt_insert_rec *rec = &info->lti_dt_rec; + struct lu_fid tfid; struct lfsck_instance *lfsck = com->lc_lfsck; struct dt_device *dev = lfsck->li_next; const char *name = cname->ln_name; struct dt_object *obj = NULL; struct lustre_handle plh = { 0 }; struct lustre_handle clh = { 0 }; - struct linkea_data ldata = { 0 }; + struct linkea_data ldata = { NULL }; struct thandle *th = NULL; bool exist = true; int rc = 0; @@ -1780,7 +1776,7 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env, goto replace; } - rc = dt_lookup(env, parent, (struct dt_rec *)tfid, + rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, (const struct dt_key *)name, BYPASS_CAPA); if (rc == -ENOENT) { exist = false; @@ -1791,7 +1787,7 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env, GOTO(log, rc); /* Someone changed the name entry, cannot replace it. */ - if (!lu_fid_eq(cfid, tfid)) + if (!lu_fid_eq(cfid, &tfid)) GOTO(log, rc = 0); /* lock the object to be destroyed. */ @@ -1929,8 +1925,6 @@ int lfsck_namespace_rebuild_linkea(const struct lu_env *env, int rc = 0; ENTRY; - LASSERT(!dt_object_remote(obj)); - th = dt_trans_create(env, dev); if (IS_ERR(th)) GOTO(log, rc = PTR_ERR(th)); @@ -2006,15 +2000,14 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, const char *name, const char *name2, __u16 type, bool update, bool dec) { - struct lfsck_thread_info *info = lfsck_env_info(env); - struct dt_insert_rec *rec = &info->lti_dt_rec; - const struct lu_fid *cfid = lfsck_dto2fid(child); - struct lu_fid *tfid = &info->lti_fid5; - struct lfsck_instance *lfsck = com->lc_lfsck; - struct dt_device *dev = lfsck->li_next; - struct thandle *th = NULL; - struct lustre_handle lh = { 0 }; - int rc = 0; + struct dt_insert_rec *rec = &lfsck_env_info(env)->lti_dt_rec; + const struct lu_fid *cfid = lfsck_dto2fid(child); + struct lu_fid tfid; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct dt_device *dev = lfsck->li_next; + struct thandle *th = NULL; + struct lustre_handle lh = { 0 }; + int rc = 0; ENTRY; if (unlikely(!dt_try_as_dir(env, parent))) @@ -2054,7 +2047,7 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, GOTO(stop, rc); dt_write_lock(env, parent, 0); - rc = dt_lookup(env, parent, (struct dt_rec *)tfid, + rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, (const struct dt_key *)name, BYPASS_CAPA); /* Someone has removed the bad name entry by race. */ if (rc == -ENOENT) @@ -2065,7 +2058,7 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, /* Someone has removed the bad name entry and reused it for other * object by race. */ - if (!lu_fid_eq(tfid, cfid)) + if (!lu_fid_eq(&tfid, cfid)) GOTO(unlock2, rc = 0); if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) @@ -2156,7 +2149,7 @@ static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env, struct lfsck_instance *lfsck = com->lc_lfsck; struct dt_device *dev = lfsck->li_bottom; struct thandle *th = NULL; - struct linkea_data ldata = { 0 }; + struct linkea_data ldata = { NULL }; struct lu_buf linkea_buf; int rc = 0; ENTRY; @@ -2339,7 +2332,7 @@ lfsck_namespace_dsd_single(const struct lu_env *env, struct lfsck_thread_info *info = lfsck_env_info(env); struct lu_name *cname = &info->lti_name; const struct lu_fid *cfid = lfsck_dto2fid(child); - struct lu_fid *tfid = &info->lti_fid3; + struct lu_fid tfid; struct lfsck_namespace *ns = com->lc_file_ram; struct lfsck_instance *lfsck = com->lc_lfsck; struct dt_object *parent = NULL; @@ -2347,9 +2340,9 @@ lfsck_namespace_dsd_single(const struct lu_env *env, int rc = 0; ENTRY; - lfsck_namespace_unpack_linkea_entry(ldata, cname, tfid, info->lti_key); + lfsck_namespace_unpack_linkea_entry(ldata, cname, &tfid, info->lti_key); /* The unique linkEA entry with bad parent will be handled as orphan. */ - if (!fid_is_sane(tfid)) { + if (!fid_is_sane(&tfid)) { if (!lustre_handle_is_used(lh) && retry != NULL) *retry = true; else @@ -2359,7 +2352,7 @@ lfsck_namespace_dsd_single(const struct lu_env *env, GOTO(out, rc); } - parent = lfsck_object_find_bottom(env, lfsck, tfid); + parent = lfsck_object_find_bottom(env, lfsck, &tfid); if (IS_ERR(parent)) GOTO(out, rc = PTR_ERR(parent)); @@ -2401,7 +2394,7 @@ lost_parent: /* It is an invalid name entry, we * cannot trust the parent also. */ rc = lfsck_namespace_shrink_linkea(env, com, child, - ldata, cname, tfid, true); + ldata, cname, &tfid, true); if (rc < 0) GOTO(out, rc); @@ -2414,7 +2407,7 @@ lost_parent: } /* Create the lost parent as an orphan. */ - rc = lfsck_namespace_create_orphan(env, com, parent, lmv); + rc = lfsck_namespace_create_orphan_dir(env, com, parent, lmv); if (rc >= 0) { /* Add the missing name entry to the parent. */ rc = lfsck_namespace_insert_normal(env, com, parent, @@ -2430,7 +2423,7 @@ lost_parent: * current system to be consistent. */ rc = lfsck_namespace_shrink_linkea(env, com, child, ldata, - cname, tfid, true); + cname, &tfid, true); if (rc >= 0) { snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf), @@ -2456,7 +2449,7 @@ lost_parent: GOTO(out, rc); } - rc = dt_lookup(env, parent, (struct dt_rec *)tfid, + rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, (const struct dt_key *)cname->ln_name, BYPASS_CAPA); if (rc == -ENOENT) { /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT @@ -2486,7 +2479,7 @@ lost_parent: /* It is an invalid name entry, drop it. */ if (unlikely(rc > 0)) { rc = lfsck_namespace_shrink_linkea(env, com, child, - ldata, cname, tfid, true); + ldata, cname, &tfid, true); if (rc >= 0) { snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf), @@ -2518,7 +2511,7 @@ lost_parent: * internal status of create operation. Under such * case, nothing to be done. */ rc = lfsck_namespace_shrink_linkea_cond(env, com, - parent, child, ldata, cname, tfid); + parent, child, ldata, cname, &tfid); if (rc >= 0) { snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf), @@ -2534,7 +2527,7 @@ lost_parent: if (rc != 0) GOTO(out, rc); - if (!lu_fid_eq(tfid, cfid)) { + if (!lu_fid_eq(&tfid, cfid)) { if (!lustre_handle_is_used(lh) && retry != NULL) { *retry = true; @@ -2546,7 +2539,7 @@ lost_parent: * may be created by the LFSCK for repairing dangling * name entry. Try to replace it. */ rc = lfsck_namespace_replace_cond(env, com, parent, child, - tfid, cname); + &tfid, cname); if (rc == 0) rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, lh, type); @@ -2554,6 +2547,9 @@ lost_parent: GOTO(out, rc); } + if (fid_is_zero(pfid)) + GOTO(out, rc = 0); + /* The ".." name entry is wrong, update it. */ if (!lu_fid_eq(pfid, lfsck_dto2fid(parent))) { if (!lustre_handle_is_used(lh) && retry != NULL) { @@ -2612,27 +2608,29 @@ lfsck_namespace_dsd_multiple(const struct lu_env *env, struct lfsck_thread_info *info = lfsck_env_info(env); struct lu_name *cname = &info->lti_name; const struct lu_fid *cfid = lfsck_dto2fid(child); - struct lu_fid *tfid = &info->lti_fid3; - struct lu_fid *pfid2 = &info->lti_fid4; + struct lu_fid *pfid2 = &info->lti_fid3; + struct lu_fid tfid; struct lfsck_namespace *ns = com->lc_file_ram; struct lfsck_instance *lfsck = com->lc_lfsck; struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; struct dt_object *parent = NULL; - struct linkea_data ldata_new = { 0 }; - int count = 0; + struct linkea_data ldata_new = { NULL }; + int dirent_count = 0; + int linkea_count = 0; int rc = 0; bool once = true; ENTRY; again: while (ldata->ld_lee != NULL) { - lfsck_namespace_unpack_linkea_entry(ldata, cname, tfid, + lfsck_namespace_unpack_linkea_entry(ldata, cname, &tfid, info->lti_key); /* Drop repeated linkEA entries. */ - lfsck_namespace_filter_linkea_entry(ldata, cname, tfid, true); + lfsck_namespace_filter_linkea_entry(ldata, cname, &tfid, true); /* Drop invalid linkEA entry. */ - if (!fid_is_sane(tfid)) { + if (!fid_is_sane(&tfid)) { linkea_del_buf(ldata, cname); + linkea_count++; continue; } @@ -2649,12 +2647,12 @@ again: * When the LFSCK runs again, if the dangling name is still * there, the LFSCK should move the orphan directory object * back to the normal namespace. */ - if (!lpf && !lu_fid_eq(pfid, tfid) && once) { + if (!lpf && !lu_fid_eq(pfid, &tfid) && once) { linkea_next_entry(ldata); continue; } - parent = lfsck_object_find_bottom(env, lfsck, tfid); + parent = lfsck_object_find_bottom(env, lfsck, &tfid); if (IS_ERR(parent)) RETURN(PTR_ERR(parent)); @@ -2666,6 +2664,7 @@ again: * child to be visible via other parent, then * remove this linkEA entry. */ linkea_del_buf(ldata, cname); + linkea_count++; continue; } @@ -2676,10 +2675,11 @@ again: if (unlikely(!dt_try_as_dir(env, parent))) { lfsck_object_put(env, parent); linkea_del_buf(ldata, cname); + linkea_count++; continue; } - rc = dt_lookup(env, parent, (struct dt_rec *)tfid, + rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, (const struct dt_key *)cname->ln_name, BYPASS_CAPA); *pfid2 = *lfsck_dto2fid(parent); @@ -2695,7 +2695,7 @@ again: RETURN(rc); } - if (lu_fid_eq(tfid, cfid)) { + if (lu_fid_eq(&tfid, cfid)) { lfsck_object_put(env, parent); if (!lu_fid_eq(pfid, pfid2)) { *type = LNIT_UNMATCHED_PAIRS; @@ -2723,17 +2723,18 @@ rebuild: RETURN(rc); linkea_del_buf(ldata, cname); + linkea_count++; linkea_first_entry(ldata); /* There may be some invalid dangling name entries under * other parent directories, remove all of them. */ while (ldata->ld_lee != NULL) { lfsck_namespace_unpack_linkea_entry(ldata, - cname, tfid, info->lti_key); - if (!fid_is_sane(tfid)) + cname, &tfid, info->lti_key); + if (!fid_is_sane(&tfid)) goto next; parent = lfsck_object_find_bottom(env, lfsck, - tfid); + &tfid); if (IS_ERR(parent)) { rc = PTR_ERR(parent); if (rc != -ENOENT && @@ -2759,13 +2760,13 @@ rebuild: goto next; } - count += rc; + dirent_count += rc; next: linkea_del_buf(ldata, cname); } - ns->ln_dirent_repaired += count; + ns->ln_dirent_repaired += dirent_count; RETURN(rc); } @@ -2775,7 +2776,7 @@ next: * created by the LFSCK for repairing dangling name entry. * Try to replace it. */ rc = lfsck_namespace_replace_cond(env, com, parent, child, - tfid, cname); + &tfid, cname); lfsck_object_put(env, parent); if (rc < 0) RETURN(rc); @@ -2786,10 +2787,15 @@ next: linkea_del_buf(ldata, cname); } + linkea_first_entry(ldata); if (ldata->ld_leh->leh_reccount == 1) { rc = lfsck_namespace_dsd_single(env, com, child, pfid, ldata, lh, type, NULL); + if (rc == 0 && fid_is_zero(pfid) && linkea_count > 0) + rc = lfsck_namespace_rebuild_linkea(env, com, child, + ldata); + RETURN(rc); } @@ -2802,7 +2808,6 @@ next: RETURN(rc); } - linkea_first_entry(ldata); /* If the dangling name entry for the orphan directory object has * been remvoed, then just check whether the directory object is * still under the .lustre/lost+found/MDTxxxx/ or not. */ @@ -2834,7 +2839,7 @@ next: * \param[in] env pointer to the thread context * \param[in] com pointer to the lfsck component * \param[in] obj pointer to the dt_object to be handled - * \param[in,out] nlink pointer to buffer to object's hard lock count before + * \param[in,out] la pointer to buffer to object's attribute before * and after the repairing * * \retval positive number for repaired cases @@ -2843,10 +2848,10 @@ next: */ static int lfsck_namespace_repair_nlink(const struct lu_env *env, struct lfsck_component *com, - struct dt_object *obj, __u32 *nlink) + struct dt_object *obj, + struct lu_attr *la) { struct lfsck_thread_info *info = lfsck_env_info(env); - struct lu_attr *la = &info->lti_la3; struct lu_fid *tfid = &info->lti_fid3; struct lfsck_namespace *ns = com->lc_file_ram; struct lfsck_instance *lfsck = com->lc_lfsck; @@ -2854,9 +2859,10 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env, const struct lu_fid *cfid = lfsck_dto2fid(obj); struct dt_object *child = NULL; struct thandle *th = NULL; - struct linkea_data ldata = { 0 }; + struct linkea_data ldata = { NULL }; struct lustre_handle lh = { 0 }; - __u32 old = *nlink; + __u32 old = la->la_nlink; + int idx; int rc = 0; __u8 flags; ENTRY; @@ -2899,25 +2905,29 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env, GOTO(unlock, rc = 0); fid_cpu_to_be(tfid, cfid); - rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)&flags, - (const struct dt_key *)tfid, BYPASS_CAPA); + idx = lfsck_sub_trace_file_fid2idx(cfid); + rc = dt_lookup(env, com->lc_sub_trace_objs[idx].lsto_obj, + (struct dt_rec *)&flags, (const struct dt_key *)tfid, + BYPASS_CAPA); if (rc != 0) GOTO(unlock, rc); if (flags & LNTF_SKIP_NLINK) GOTO(unlock, rc = 0); - rc = lfsck_links_read2(env, child, &ldata); - if (rc == -ENODATA) - GOTO(unlock, rc = 0); + rc = dt_attr_get(env, child, la, BYPASS_CAPA); + if (rc != 0) + GOTO(unlock, rc = (rc == -ENOENT ? 0 : rc)); + rc = lfsck_links_read2(env, child, &ldata); if (rc != 0) - GOTO(unlock, rc); + GOTO(unlock, rc = (rc == -ENODATA ? 0 : rc)); - if (*nlink == ldata.ld_leh->leh_reccount) + if (la->la_nlink == ldata.ld_leh->leh_reccount || + unlikely(la->la_nlink == 0)) GOTO(unlock, rc = 0); - la->la_nlink = *nlink = ldata.ld_leh->leh_reccount; + la->la_nlink = ldata.ld_leh->leh_reccount; if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) GOTO(unlock, rc = 1); @@ -2938,7 +2948,7 @@ log: CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired the object "DFID"'s " "nlink count from %u to %u: rc = %d\n", - lfsck_lfsck2name(lfsck), PFID(cfid), old, *nlink, rc); + lfsck_lfsck2name(lfsck), PFID(cfid), old, la->la_nlink, rc); if (rc != 0) ns->ln_flags |= LF_INCONSISTENT; @@ -2993,7 +3003,7 @@ static int lfsck_namespace_double_scan_dir(const struct lu_env *env, struct lfsck_namespace *ns = com->lc_file_ram; struct lfsck_instance *lfsck = com->lc_lfsck; struct lustre_handle lh = { 0 }; - struct linkea_data ldata = { 0 }; + struct linkea_data ldata = { NULL }; bool unknown = false; bool lpf = false; bool retry = false; @@ -3003,6 +3013,22 @@ static int lfsck_namespace_double_scan_dir(const struct lu_env *env, LASSERT(!dt_object_remote(child)); + if (flags & LNTF_UNCERTAIN_LMV) { + if (flags & LNTF_RECHECK_NAME_HASH) { + rc = lfsck_namespace_scan_shard(env, com, child); + if (rc < 0) + RETURN(rc); + + ns->ln_striped_shards_scanned++; + } else { + ns->ln_striped_shards_skipped++; + } + } + + flags &= ~(LNTF_RECHECK_NAME_HASH | LNTF_UNCERTAIN_LMV); + if (flags == 0) + RETURN(0); + if (flags & (LNTF_CHECK_LINKEA | LNTF_CHECK_PARENT) && !(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) { CDEBUG(D_LFSCK, "%s: some MDT(s) maybe NOT take part in the" @@ -3055,6 +3081,8 @@ lock: } else if (lfsck->li_lpf_obj != NULL && lu_fid_eq(pfid, lfsck_dto2fid(lfsck->li_lpf_obj))) { lpf = true; + } else if (unlikely(!fid_is_sane(pfid))) { + fid_zero(pfid); } rc = lfsck_links_read(env, child, &ldata); @@ -3207,7 +3235,7 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env, struct lfsck_instance *lfsck = com->lc_lfsck; struct lfsck_namespace *ns = com->lc_file_ram; struct dt_object *parent = NULL; - struct linkea_data ldata = { 0 }; + struct linkea_data ldata = { NULL }; bool repaired = false; int count = 0; int rc; @@ -3298,7 +3326,7 @@ lost_parent: } /* Create the lost parent as an orphan. */ - rc = lfsck_namespace_create_orphan(env, com, + rc = lfsck_namespace_create_orphan_dir(env, com, parent, NULL); if (rc < 0) { lfsck_object_put(env, parent); @@ -3498,37 +3526,38 @@ out: count = ldata.ld_leh->leh_reccount; } - /* If the LFSCK is marked as LF_INCOMPLETE, then means some - * MDT has ever tried to verify some remote MDT-object that - * resides on this MDT, but this MDT failed to respond such - * request. So means there may be some remote name entry on - * other MDT that references this object with another name, - * so we cannot know whether this linkEA is valid or not. - * So keep it there and maybe resolved when next LFSCK run. */ - if (count == 0 && !(ns->ln_flags & LF_INCOMPLETE)) { - /* If the child becomes orphan, then insert it into - * the global .lustre/lost+found/MDTxxxx directory. */ - rc = lfsck_namespace_insert_orphan(env, com, child, "", "O", - &count); - if (rc < 0) - return rc; + if (count == 0) { + /* If the LFSCK is marked as LF_INCOMPLETE, then means some + * MDT has ever tried to verify some remote MDT-object that + * resides on this MDT, but this MDT failed to respond such + * request. So means there may be some remote name entry on + * other MDT that references this object with another name, + * so we cannot know whether this linkEA is valid or not. + * So keep it there and maybe resolved when next LFSCK run. */ + if (!(ns->ln_flags & LF_INCOMPLETE)) { + /* If the child becomes orphan, then insert it into + * the global .lustre/lost+found/MDTxxxx directory. */ + rc = lfsck_namespace_insert_orphan(env, com, child, + "", "O", &count); + if (rc < 0) + return rc; - if (rc > 0) { - ns->ln_mul_ref_repaired++; - repaired = true; + if (rc > 0) { + ns->ln_mul_ref_repaired++; + repaired = true; + } } - } - - rc = dt_attr_get(env, child, la, BYPASS_CAPA); - if (rc != 0) - return rc; + } else { + rc = dt_attr_get(env, child, la, BYPASS_CAPA); + if (rc != 0) + return rc; - if (la->la_nlink != count) { - rc = lfsck_namespace_repair_nlink(env, com, child, - &la->la_nlink); - if (rc > 0) { - ns->ln_objs_nlink_repaired++; - rc = 0; + if (la->la_nlink != 0 && la->la_nlink != count) { + rc = lfsck_namespace_repair_nlink(env, com, child, la); + if (rc > 0) { + ns->ln_objs_nlink_repaired++; + rc = 0; + } } } @@ -3622,6 +3651,56 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m, time_phase2); } +static void lfsck_namespace_release_lmv(const struct lu_env *env, + struct lfsck_component *com) +{ + struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_namespace *ns = com->lc_file_ram; + + while (!list_empty(&lfsck->li_list_lmv)) { + struct lfsck_lmv_unit *llu; + struct lfsck_lmv *llmv; + + llu = list_entry(lfsck->li_list_lmv.next, + struct lfsck_lmv_unit, llu_link); + llmv = &llu->llu_lmv; + + LASSERTF(atomic_read(&llmv->ll_ref) == 1, + "still in using: %u\n", + atomic_read(&llmv->ll_ref)); + + ns->ln_striped_dirs_skipped++; + lfsck_lmv_put(env, llmv); + } +} + +static int lfsck_namespace_check_for_double_scan(const struct lu_env *env, + struct lfsck_component *com, + struct dt_object *obj) +{ + struct lu_attr *la = &lfsck_env_info(env)->lti_la; + int rc; + + rc = dt_attr_get(env, obj, la, BYPASS_CAPA); + if (rc != 0) + return rc; + + /* zero-linkEA object may be orphan, but it also maybe because + * of upgrading. Currently, we cannot record it for double scan. + * Because it may cause the LFSCK trace file to be too large. */ + + /* "la_ctime" == 1 means that it has ever been removed from + * backend /lost+found directory but not been added back to + * the normal namespace yet. */ + + if ((S_ISREG(lfsck_object_type(obj)) && la->la_nlink > 1) || + unlikely(la->la_ctime == 1)) + rc = lfsck_namespace_trace_update(env, com, lfsck_dto2fid(obj), + LNTF_CHECK_LINKEA, true); + + return rc; +} + /* namespace APIs */ static int lfsck_namespace_reset(const struct lu_env *env, @@ -3656,29 +3735,22 @@ static int lfsck_namespace_reset(const struct lu_env *env, ns->ln_magic = LFSCK_NAMESPACE_MAGIC; ns->ln_status = LS_INIT; - rc = local_object_unlink(env, lfsck->li_bottom, root, - lfsck_namespace_name); - if (rc != 0) - GOTO(out, rc); - lfsck_object_put(env, com->lc_obj); com->lc_obj = NULL; - dto = local_index_find_or_create(env, lfsck->li_los, root, - lfsck_namespace_name, - S_IFREG | S_IRUGO | S_IWUSR, - &dt_lfsck_features); + dto = lfsck_namespace_load_one_trace_file(env, com, root, + LFSCK_NAMESPACE, NULL, true); if (IS_ERR(dto)) GOTO(out, rc = PTR_ERR(dto)); com->lc_obj = dto; - rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features); + rc = lfsck_namespace_load_sub_trace_files(env, com, true); if (rc != 0) GOTO(out, rc); lad->lad_incomplete = 0; CFS_RESET_BITMAP(lad->lad_bitmap); - rc = lfsck_namespace_store(env, com); + rc = lfsck_namespace_store(env, com, true); GOTO(out, rc); @@ -3815,7 +3887,7 @@ static int lfsck_namespace_checkpoint(const struct lu_env *env, com->lc_new_checked = 0; } - rc = lfsck_namespace_store(env, com); + rc = lfsck_namespace_store(env, com, false); up_write(&com->lc_sem); log: @@ -3937,14 +4009,13 @@ static int lfsck_namespace_exec_oit(const struct lu_env *env, struct lfsck_namespace *ns = com->lc_file_ram; struct lfsck_instance *lfsck = com->lc_lfsck; const struct lu_fid *fid = lfsck_dto2fid(obj); - struct lu_attr *la = &info->lti_la; struct lu_fid *pfid = &info->lti_fid2; struct lu_name *cname = &info->lti_name; struct lu_seq_range *range = &info->lti_range; struct dt_device *dev = lfsck->li_bottom; struct seq_server_site *ss = lu_site2seq(dev->dd_lu_dev.ld_site); - struct linkea_data ldata = { 0 }; + struct linkea_data ldata = { NULL }; __u32 idx = lfsck_dev_idx(dev); int rc; ENTRY; @@ -3973,23 +4044,8 @@ static int lfsck_namespace_exec_oit(const struct lu_env *env, GOTO(out, rc = (rc == -ENOENT ? 0 : rc)); } - /* zero-linkEA object may be orphan, but it also maybe because - * of upgrading. Currently, we cannot record it for double scan. - * Because it may cause the LFSCK trace file to be too large. */ if (rc == -ENODATA) { - if (S_ISDIR(lfsck_object_type(obj))) - GOTO(out, rc = 0); - - rc = dt_attr_get(env, obj, la, BYPASS_CAPA); - if (rc != 0) - GOTO(out, rc); - - /* "la_ctime" == 1 means that it has ever been removed from - * backend /lost+found directory but not been added back to - * the normal namespace yet. */ - if (la->la_nlink > 1 || unlikely(la->la_ctime == 1)) - rc = lfsck_namespace_trace_update(env, com, fid, - LNTF_CHECK_LINKEA, true); + rc = lfsck_namespace_check_for_double_scan(env, com, obj); GOTO(out, rc); } @@ -4015,24 +4071,12 @@ static int lfsck_namespace_exec_oit(const struct lu_env *env, rc = fld_local_lookup(env, ss->ss_server_fld, fid_seq(pfid), range); if ((rc == -ENOENT) || - (rc == 0 && range->lsr_index != idx)) { + (rc == 0 && range->lsr_index != idx)) rc = lfsck_namespace_trace_update(env, com, fid, LNTF_CHECK_LINKEA, true); - } else { - if (S_ISDIR(lfsck_object_type(obj))) - GOTO(out, rc = 0); - - rc = dt_attr_get(env, obj, la, BYPASS_CAPA); - if (rc != 0) - GOTO(out, rc); - - /* "la_ctime" == 1 means that it has ever been - * removed from backend /lost+found directory but - * not been added back to the normal namespace yet. */ - if (la->la_nlink > 1 || unlikely(la->la_ctime == 1)) - rc = lfsck_namespace_trace_update(env, com, - fid, LNTF_CHECK_LINKEA, true); - } + else + rc = lfsck_namespace_check_for_double_scan(env, com, + obj); } GOTO(out, rc); @@ -4053,9 +4097,28 @@ static int lfsck_namespace_exec_dir(const struct lu_env *env, struct lfsck_component *com, struct lu_dirent *ent, __u16 type) { - struct lfsck_assistant_data *lad = com->lc_data; + struct lfsck_assistant_data *lad = com->lc_data; + struct lfsck_instance *lfsck = com->lc_lfsck; struct lfsck_namespace_req *lnr; - bool wakeup = false; + struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; + struct ptlrpc_thread *mthread = &lfsck->li_thread; + struct ptlrpc_thread *athread = &lad->lad_thread; + struct l_wait_info lwi = { 0 }; + bool wakeup = false; + + l_wait_event(mthread->t_ctl_waitq, + bk->lb_async_windows == 0 || + lad->lad_prefetched < bk->lb_async_windows || + !thread_is_running(mthread) || + thread_is_stopped(athread), + &lwi); + + if (unlikely(!thread_is_running(mthread)) || + thread_is_stopped(athread)) + return 0; + + if (unlikely(lfsck_is_dead_obj(lfsck->li_obj_dir))) + return 0; lnr = lfsck_namespace_assistant_req_init(com->lc_lfsck, ent, type); if (IS_ERR(lnr)) { @@ -4100,6 +4163,8 @@ static int lfsck_namespace_post(const struct lu_env *env, lfsck_post_generic(env, com, &result); down_write(&com->lc_sem); + lfsck_namespace_release_lmv(env, com); + spin_lock(&lfsck->li_lock); if (!init) ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint; @@ -4133,7 +4198,7 @@ static int lfsck_namespace_post(const struct lu_env *env, com->lc_new_checked = 0; } - rc = lfsck_namespace_store(env, com); + rc = lfsck_namespace_store(env, com, false); up_write(&com->lc_sem); CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n", @@ -4205,7 +4270,8 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com, lfsck->li_time_last_checkpoint; __u64 checked = ns->ln_items_checked + com->lc_new_checked; __u64 speed = checked; - __u64 new_checked = com->lc_new_checked * HZ; + __u64 new_checked = com->lc_new_checked * + msecs_to_jiffies(MSEC_PER_SEC); __u32 rtime = ns->ln_run_time_phase1 + cfs_duration_sec(duration + HALF_SEC); @@ -4259,7 +4325,8 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com, com->lc_new_checked; __u64 speed1 = ns->ln_items_checked; __u64 speed2 = checked; - __u64 new_checked = com->lc_new_checked * HZ; + __u64 new_checked = com->lc_new_checked * + msecs_to_jiffies(MSEC_PER_SEC); __u32 rtime = ns->ln_run_time_phase2 + cfs_duration_sec(duration + HALF_SEC); @@ -4348,6 +4415,7 @@ static void lfsck_namespace_data_release(const struct lu_env *env, LASSERT(list_empty(&lad->lad_req_list)); com->lc_data = NULL; + lfsck_namespace_release_lmv(env, com); spin_lock(<ds->ltd_lock); list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list, @@ -4386,6 +4454,8 @@ static void lfsck_namespace_quit(const struct lu_env *env, thread_is_stopped(&lad->lad_thread)); LASSERT(list_empty(&lad->lad_req_list)); + lfsck_namespace_release_lmv(env, com); + spin_lock(<ds->ltd_lock); list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list, ltd_namespace_phase_list) { @@ -4413,81 +4483,51 @@ static int lfsck_namespace_in_notify(const struct lu_env *env, ENTRY; switch (lr->lr_event) { - case LE_CREATE_ORPHAN: { - struct dt_object *orphan = NULL; - struct lmv_mds_md_v1 *lmv; - - CDEBUG(D_LFSCK, "%s: namespace LFSCK handling notify from " - "MDT %x to create orphan"DFID" with type %o\n", - lfsck_lfsck2name(lfsck), lr->lr_index, - PFID(&lr->lr_fid), lr->lr_type); - - orphan = lfsck_object_find(env, lfsck, &lr->lr_fid); - if (IS_ERR(orphan)) - GOTO(out_create, rc = PTR_ERR(orphan)); - - if (dt_object_exists(orphan)) - GOTO(out_create, rc = -EEXIST); - - if (lr->lr_stripe_count > 0) { - lmv = &lfsck_env_info(env)->lti_lmv; - memset(lmv, 0, sizeof(*lmv)); - lmv->lmv_hash_type = lr->lr_hash_type; - lmv->lmv_stripe_count = lr->lr_stripe_count; - lmv->lmv_layout_version = lr->lr_layout_version; - memcpy(lmv->lmv_pool_name, lr->lr_pool_name, - sizeof(lmv->lmv_pool_name)); - } else { - lmv = NULL; - } - - rc = lfsck_namespace_create_orphan_local(env, com, orphan, - lr->lr_type, lmv); - - GOTO(out_create, rc = (rc == 1) ? 0 : rc); - -out_create: - CDEBUG(D_LFSCK, "%s: namespace LFSCK handled notify from " - "MDT %x to create orphan"DFID" with type %o: rc = %d\n", - lfsck_lfsck2name(lfsck), lr->lr_index, - PFID(&lr->lr_fid), lr->lr_type, rc); - - if (orphan != NULL && !IS_ERR(orphan)) - lfsck_object_put(env, orphan); - - return rc; - } case LE_SKIP_NLINK_DECLARE: { - struct dt_object *obj = com->lc_obj; + struct dt_object *obj; struct lu_fid *key = &lfsck_env_info(env)->lti_fid3; + int idx; __u8 flags = 0; LASSERT(th != NULL); + idx = lfsck_sub_trace_file_fid2idx(&lr->lr_fid); + obj = com->lc_sub_trace_objs[idx].lsto_obj; + fid_cpu_to_be(key, &lr->lr_fid); + mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex); rc = dt_declare_delete(env, obj, (const struct dt_key *)key, th); if (rc == 0) rc = dt_declare_insert(env, obj, (const struct dt_rec *)&flags, (const struct dt_key *)key, th); + mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex); RETURN(rc); } case LE_SKIP_NLINK: { - struct dt_object *obj = com->lc_obj; + struct dt_object *obj; struct lu_fid *key = &lfsck_env_info(env)->lti_fid3; + int idx; __u8 flags = 0; bool exist = false; ENTRY; LASSERT(th != NULL); + idx = lfsck_sub_trace_file_fid2idx(&lr->lr_fid); + obj = com->lc_sub_trace_objs[idx].lsto_obj; fid_cpu_to_be(key, &lr->lr_fid); + mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex); rc = dt_lookup(env, obj, (struct dt_rec *)&flags, (const struct dt_key *)key, BYPASS_CAPA); if (rc == 0) { - if (flags & LNTF_SKIP_NLINK) + if (flags & LNTF_SKIP_NLINK) { + mutex_unlock( + &com->lc_sub_trace_objs[idx].lsto_mutex); + RETURN(0); + } exist = true; } else if (rc != -ENOENT) { @@ -4508,6 +4548,7 @@ out_create: GOTO(log, rc); log: + mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex); CDEBUG(D_LFSCK, "%s: RPC service thread mark the "DFID " to be skipped for namespace double scan: rc = %d\n", lfsck_lfsck2name(com->lc_lfsck), PFID(&lr->lr_fid), rc); @@ -4521,6 +4562,28 @@ log: return 0; } + case LE_SET_LMV_MASTER: { + struct dt_object *obj; + + obj = lfsck_object_find_by_dev(env, lfsck->li_bottom, + &lr->lr_fid); + if (IS_ERR(obj)) + RETURN(PTR_ERR(obj)); + + rc = lfsck_namespace_notify_lmv_master_local(env, com, obj); + lfsck_object_put(env, obj); + + RETURN(rc > 0 ? 0 : rc); + } + case LE_SET_LMV_SLAVE: { + if (!(lr->lr_flags & LEF_RECHECK_NAME_HASH)) + ns->ln_striped_shards_repaired++; + + rc = lfsck_namespace_trace_update(env, com, &lr->lr_fid, + LNTF_RECHECK_NAME_HASH, true); + + RETURN(rc > 0 ? 0 : rc); + } case LE_PHASE1_DONE: case LE_PHASE2_DONE: case LE_PEER_EXIT: @@ -4662,7 +4725,7 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, struct lmv_mds_md_v1 *lmv2 = &info->lti_lmv2; struct dt_object *parent = lnr->lnr_obj; const struct lu_name *cname; - struct linkea_data ldata = { 0 }; + struct linkea_data ldata = { NULL }; struct lustre_handle lh = { 0 }; struct lu_buf linkea_buf; struct lu_buf lmv_buf; @@ -4869,7 +4932,7 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env, struct lfsck_instance *lfsck = com->lc_lfsck; struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; struct lfsck_namespace *ns = com->lc_file_ram; - struct linkea_data ldata = { 0 }; + struct linkea_data ldata = { NULL }; const struct lu_name *cname; struct thandle *handle = NULL; struct lfsck_namespace_req *lnr = @@ -4887,10 +4950,11 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env, bool bad_hash = false; int idx = 0; int count = 0; - int rc; + int rc = 0; enum lfsck_namespace_inconsistency_type type = LNIT_NONE; ENTRY; + la->la_nlink = 0; if (lnr->lnr_attr & LUDA_UPGRADE) { ns->ln_flags |= LF_UPGRADE; ns->ln_dirent_repaired++; @@ -4911,6 +4975,24 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env, GOTO(out, rc); } + if (unlikely(!fid_is_sane(&lnr->lnr_fid))) { + CDEBUG(D_LFSCK, "%s: dir scan find invalid FID "DFID + " for the name entry %.*s under "DFID"\n", + lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid), + lnr->lnr_namelen, lnr->lnr_name, PFID(pfid)); + + if (strcmp(lnr->lnr_name, dotdot) != 0) + /* invalid FID means bad name entry, remove it. */ + type = LNIT_BAD_DIRENT; + else + /* If the parent FID is invalid, we cannot remove + * the ".." entry directly. */ + rc = lfsck_namespace_trace_update(env, com, pfid, + LNTF_CHECK_PARENT, true); + + GOTO(out, rc); + } + if (unlikely(lnr->lnr_dir_cookie == MDS_DIR_END_OFF)) { rc = lfsck_namespace_striped_dir_rescan(env, com, lnr); @@ -4921,6 +5003,12 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env, (lnr->lnr_namelen == 1 || fid_seq_is_dot(fid_seq(&lnr->lnr_fid)))) GOTO(out, rc = 0); + if (lnr->lnr_lmv != NULL && lnr->lnr_lmv->ll_lmv_master) { + rc = lfsck_namespace_handle_striped_master(env, com, lnr); + + RETURN(rc); + } + idx = lfsck_find_mdt_idx_by_fid(env, lfsck, &lnr->lnr_fid); if (idx < 0) GOTO(out, rc = idx); @@ -5038,7 +5126,7 @@ again: type = LNIT_BAD_TYPE; } - goto record; + goto stop; } ns->ln_flags |= LF_INCONSISTENT; @@ -5100,7 +5188,7 @@ nodata: ns->ln_linkea_repaired++; repaired = true; log = true; - goto record; + goto stop; } if (!lustre_handle_is_used(&lh)) @@ -5168,35 +5256,6 @@ nodata: GOTO(stop, rc); } -record: - LASSERT(count > 0); - - rc = dt_attr_get(env, obj, la, BYPASS_CAPA); - if (rc != 0) - GOTO(stop, rc); - - if ((count == 1 && la->la_nlink == 1) || - S_ISDIR(lfsck_object_type(obj))) - /* Usually, it is for single linked object or dir, do nothing.*/ - GOTO(stop, rc); - - /* Following modification will be in another transaction. */ - if (handle != NULL) { - dt_write_unlock(env, obj); - dtlocked = false; - - dt_trans_stop(env, dev, handle); - handle = NULL; - - lfsck_ibits_unlock(&lh, LCK_EX); - } - - ns->ln_mul_linked_checked++; - rc = lfsck_namespace_trace_update(env, com, &lnr->lnr_fid, - LNTF_CHECK_LINKEA, true); - - GOTO(out, rc); - stop: if (dtlocked) dt_write_unlock(env, obj); @@ -5245,6 +5304,9 @@ out: default: break; } + + if (count == 1 && S_ISREG(lfsck_object_type(obj))) + dt_attr_get(env, obj, la, BYPASS_CAPA); } down_write(&com->lc_sem); @@ -5311,9 +5373,12 @@ out: false); } - rc = 0; } + + if (count > 1 || la->la_nlink > 1) + ns->ln_mul_linked_checked++; + up_write(&com->lc_sem); if (obj != NULL && !IS_ERR(obj)) @@ -5358,10 +5423,11 @@ static int lfsck_namespace_scan_local_lpf_one(const struct lu_env *env, struct lu_fid *key = &info->lti_fid; struct lu_attr *la = &info->lti_la; struct lfsck_instance *lfsck = com->lc_lfsck; - struct dt_object *obj = com->lc_obj; + struct dt_object *obj; struct dt_device *dev = lfsck->li_bottom; struct dt_object *child = NULL; struct thandle *th = NULL; + int idx; int rc = 0; __u8 flags = 0; bool exist = false; @@ -5374,6 +5440,8 @@ static int lfsck_namespace_scan_local_lpf_one(const struct lu_env *env, LASSERT(dt_object_exists(child)); LASSERT(!dt_object_remote(child)); + idx = lfsck_sub_trace_file_fid2idx(&ent->lde_fid); + obj = com->lc_sub_trace_objs[idx].lsto_obj; fid_cpu_to_be(key, &ent->lde_fid); rc = dt_lookup(env, obj, (struct dt_rec *)&flags, (const struct dt_key *)key, BYPASS_CAPA); @@ -5539,19 +5607,9 @@ static void lfsck_namespace_scan_local_lpf(const struct lu_env *env, rc = 0; while (rc == 0) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) && - cfs_fail_val > 0) { - struct l_wait_info lwi; - - lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val), - NULL, NULL); - l_wait_event(thread->t_ctl_waitq, - !thread_is_running(thread), - &lwi); - - if (unlikely(!thread_is_running(thread))) - break; - } + if (CFS_FAIL_TIMEOUT(OBD_FAIL_LFSCK_DELAY3, cfs_fail_val) && + unlikely(!thread_is_running(thread))) + break; rc = iops->rec(env, di, (struct dt_rec *)ent, LUDA_64BITHASH | LUDA_TYPE); @@ -5648,14 +5706,125 @@ out: lu_object_put(env, &parent->do_lu); } -static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env, - struct lfsck_component *com) +/** + * Rescan the striped directory after the master LMV EA reset. + * + * Sometimes, the master LMV EA of the striped directory maybe lost, so when + * the namespace LFSCK engine scan the striped directory for the first time, + * it will be regarded as a normal directory. As the LFSCK processing, some + * other LFSCK instance on other MDT will find the shard of this striped dir, + * and find that the master MDT-object of the striped directory lost its LMV + * EA, then such remote LFSCK instance will regenerate the master LMV EA and + * notify the LFSCK instance on this MDT to rescan the striped directory. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * \param[in] llu the lfsck_lmv_unit that contains the striped directory + * to be rescanned. + * + * \retval positive number for success + * \retval 0 for LFSCK stopped/paused + * \retval negative error number on failure + */ +static int lfsck_namespace_rescan_striped_dir(const struct lu_env *env, + struct lfsck_component *com, + struct lfsck_lmv_unit *llu) +{ + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_assistant_data *lad = com->lc_data; + struct dt_object *dir; + const struct dt_it_ops *iops; + struct dt_it *di; + struct lu_dirent *ent = + (struct lu_dirent *)info->lti_key; + struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; + struct ptlrpc_thread *thread = &lfsck->li_thread; + struct lfsck_namespace_req *lnr; + struct lfsck_assistant_req *lar; + int rc; + __u16 type; + ENTRY; + + LASSERT(list_empty(&lad->lad_req_list)); + + lfsck->li_lmv = &llu->llu_lmv; + lfsck->li_obj_dir = lfsck_object_get(llu->llu_obj); + rc = lfsck_open_dir(env, lfsck, 0); + if (rc != 0) + RETURN(rc); + + dir = lfsck->li_obj_dir; + di = lfsck->li_di_dir; + iops = &dir->do_index_ops->dio_it; + do { + rc = iops->rec(env, di, (struct dt_rec *)ent, + lfsck->li_args_dir); + if (rc == 0) + rc = lfsck_unpack_ent(ent, &lfsck->li_cookie_dir, + &type); + + if (rc != 0) { + if (bk->lb_param & LPF_FAILOUT) + GOTO(out, rc); + + goto next; + } + + if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen)) + goto next; + + lnr = lfsck_namespace_assistant_req_init(lfsck, ent, type); + if (IS_ERR(lnr)) { + if (bk->lb_param & LPF_FAILOUT) + GOTO(out, rc = PTR_ERR(lnr)); + + goto next; + } + + lar = &lnr->lnr_lar; + rc = lfsck_namespace_assistant_handler_p1(env, com, lar); + lfsck_namespace_assistant_req_fini(env, lar); + if (rc != 0 && bk->lb_param & LPF_FAILOUT) + GOTO(out, rc); + + if (unlikely(!thread_is_running(thread))) + GOTO(out, rc = 0); + +next: + rc = iops->next(env, di); + } while (rc == 0); + +out: + lfsck_close_dir(env, lfsck, rc); + if (rc <= 0) + RETURN(rc); + + /* The close_dir() may insert a dummy lnr in the lad->lad_req_list. */ + if (list_empty(&lad->lad_req_list)) + RETURN(1); + + spin_lock(&lad->lad_lock); + lar = list_entry(lad->lad_req_list.next, struct lfsck_assistant_req, + lar_list); + list_del_init(&lar->lar_list); + spin_unlock(&lad->lad_lock); + + rc = lfsck_namespace_assistant_handler_p1(env, com, lar); + lfsck_namespace_assistant_req_fini(env, lar); + + RETURN(rc == 0 ? 1 : rc); +} + +static int +lfsck_namespace_double_scan_one_trace_file(const struct lu_env *env, + struct lfsck_component *com, + struct dt_object *obj, bool first) { struct lfsck_instance *lfsck = com->lc_lfsck; struct ptlrpc_thread *thread = &lfsck->li_thread; struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; struct lfsck_namespace *ns = com->lc_file_ram; - struct dt_object *obj = com->lc_obj; const struct dt_it_ops *iops = &obj->do_index_ops->dio_it; struct dt_object *target; struct dt_it *di; @@ -5665,47 +5834,40 @@ static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env, __u8 flags = 0; ENTRY; - CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n", - lfsck_lfsck2name(lfsck)); - - lfsck_namespace_scan_local_lpf(env, com); - - com->lc_new_checked = 0; - com->lc_new_scanned = 0; - com->lc_time_last_checkpoint = cfs_time_current(); - com->lc_time_next_checkpoint = com->lc_time_last_checkpoint + - cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL); - di = iops->init(env, obj, 0, BYPASS_CAPA); if (IS_ERR(di)) RETURN(PTR_ERR(di)); - fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2); + if (first) + fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2); + else + fid_zero(&fid); rc = iops->get(env, di, (const struct dt_key *)&fid); if (rc < 0) GOTO(fini, rc); - /* Skip the start one, which either has been processed or non-exist. */ - rc = iops->next(env, di); - if (rc != 0) - GOTO(put, rc); + if (first) { + /* The start one either has been processed or does not exist, + * skip it. */ + rc = iops->next(env, di); + if (rc != 0) + GOTO(put, rc); + } do { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) && - cfs_fail_val > 0) { - struct l_wait_info lwi; - - lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val), - NULL, NULL); - l_wait_event(thread->t_ctl_waitq, - !thread_is_running(thread), - &lwi); - - if (unlikely(!thread_is_running(thread))) - GOTO(put, rc = 0); - } + if (CFS_FAIL_TIMEOUT(OBD_FAIL_LFSCK_DELAY3, cfs_fail_val) && + unlikely(!thread_is_running(thread))) + GOTO(put, rc = 0); key = iops->key(env, di); + if (IS_ERR(key)) { + rc = PTR_ERR(key); + if (rc == -ENOENT) + GOTO(put, rc = 1); + + goto checkpoint; + } + fid_be_to_cpu(&fid, (const struct lu_fid *)key); if (!fid_is_sane(&fid)) { rc = 0; @@ -5734,7 +5896,8 @@ checkpoint: down_write(&com->lc_sem); com->lc_new_checked++; com->lc_new_scanned++; - ns->ln_fid_latest_scanned_phase2 = fid; + if (rc >= 0 && fid_is_sane(&fid)) + ns->ln_fid_latest_scanned_phase2 = fid; if (rc > 0) ns->ln_objs_repaired_phase2++; else if (rc < 0) @@ -5754,7 +5917,7 @@ checkpoint: ns->ln_time_last_checkpoint = cfs_time_current_sec(); ns->ln_objs_checked_phase2 += com->lc_new_checked; com->lc_new_checked = 0; - rc = lfsck_namespace_store(env, com); + rc = lfsck_namespace_store(env, com, false); up_write(&com->lc_sem); if (rc != 0) GOTO(put, rc); @@ -5780,12 +5943,56 @@ put: fini: iops->fini(env, di); - CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan stop: rc = %d\n", - lfsck_lfsck2name(lfsck), rc); - return rc; } +static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env, + struct lfsck_component *com) +{ + struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_namespace *ns = com->lc_file_ram; + int rc; + int i; + ENTRY; + + while (!list_empty(&lfsck->li_list_lmv)) { + struct lfsck_lmv_unit *llu; + + spin_lock(&lfsck->li_lock); + llu = list_entry(lfsck->li_list_lmv.next, + struct lfsck_lmv_unit, llu_link); + list_del_init(&llu->llu_link); + spin_unlock(&lfsck->li_lock); + + rc = lfsck_namespace_rescan_striped_dir(env, com, llu); + if (rc <= 0) + RETURN(rc); + } + + CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n", + lfsck_lfsck2name(lfsck)); + + lfsck_namespace_scan_local_lpf(env, com); + + com->lc_new_checked = 0; + com->lc_new_scanned = 0; + com->lc_time_last_checkpoint = cfs_time_current(); + com->lc_time_next_checkpoint = com->lc_time_last_checkpoint + + cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL); + + i = lfsck_sub_trace_file_fid2idx(&ns->ln_fid_latest_scanned_phase2); + rc = lfsck_namespace_double_scan_one_trace_file(env, com, + com->lc_sub_trace_objs[i].lsto_obj, true); + while (rc > 0 && ++i < LFSCK_STF_COUNT) + rc = lfsck_namespace_double_scan_one_trace_file(env, com, + com->lc_sub_trace_objs[i].lsto_obj, false); + + CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan stop at the No. %d " + "trace file: rc = %d\n", lfsck_lfsck2name(lfsck), i, rc); + + RETURN(rc); +} + static void lfsck_namespace_assistant_fill_pos(const struct lu_env *env, struct lfsck_component *com, struct lfsck_position *pos) @@ -5836,7 +6043,7 @@ static int lfsck_namespace_double_scan_result(const struct lu_env *env, ns->ln_status = LS_FAILED; } - rc = lfsck_namespace_store(env, com); + rc = lfsck_namespace_store(env, com, false); up_write(&com->lc_sem); return rc; @@ -5847,6 +6054,13 @@ lfsck_namespace_assistant_sync_failures_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *args, int rc) { + if (rc == 0) { + struct lfsck_async_interpret_args *laia = args; + struct lfsck_tgt_desc *ltd = laia->laia_ltd; + + ltd->ltd_synced_failures = 1; + } + return 0; } @@ -5883,9 +6097,13 @@ static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env, struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs; struct lfsck_tgt_desc *ltd; struct ptlrpc_request_set *set; + __u32 idx; int rc = 0; ENTRY; + if (!lad->lad_incomplete) + RETURN_EXIT; + set = ptlrpc_prep_set(); if (set == NULL) GOTO(out, rc = -ENOMEM); @@ -5894,25 +6112,12 @@ static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env, memset(laia, 0, sizeof(*laia)); lad->lad_touch_gen++; - spin_lock(<ds->ltd_lock); - while (!list_empty(&lad->lad_mdt_list)) { - ltd = list_entry(lad->lad_mdt_list.next, - struct lfsck_tgt_desc, - ltd_namespace_list); - if (ltd->ltd_namespace_gen == lad->lad_touch_gen) - break; - - ltd->ltd_namespace_gen = lad->lad_touch_gen; - list_move_tail(<d->ltd_namespace_list, - &lad->lad_mdt_list); - if (!lad->lad_incomplete || - !cfs_bitmap_check(lad->lad_bitmap, ltd->ltd_index)) { - ltd->ltd_namespace_failed = 0; - continue; - } + down_read(<ds->ltd_rw_sem); + cfs_foreach_bit(lad->lad_bitmap, idx) { + ltd = LTD_TGT(ltds, idx); + LASSERT(ltd != NULL); - ltd->ltd_namespace_failed = 1; - spin_unlock(<ds->ltd_lock); + laia->laia_ltd = ltd; rc = lfsck_async_request(env, ltd->ltd_exp, lr, set, lfsck_namespace_assistant_sync_failures_interpret, laia, LFSCK_NOTIFY); @@ -5920,10 +6125,8 @@ static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env, CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail " "to sync failure with MDT %x: rc = %d\n", lfsck_lfsck2name(lfsck), ltd->ltd_index, rc); - - spin_lock(<ds->ltd_lock); } - spin_unlock(<ds->ltd_lock); + up_read(<ds->ltd_rw_sem); rc = ptlrpc_set_wait(set); ptlrpc_set_destroy(set); @@ -5967,7 +6170,7 @@ int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev, struct dt_object *obj, const struct lu_name *cname, const struct lu_fid *pfid) { - struct linkea_data ldata = { 0 }; + struct linkea_data ldata = { NULL }; struct lu_buf linkea_buf; struct thandle *th; int rc; @@ -6046,7 +6249,7 @@ int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj, char *name, struct lu_fid *pfid) { struct lu_name *cname = &lfsck_env_info(env)->lti_name; - struct linkea_data ldata = { 0 }; + struct linkea_data ldata = { NULL }; int rc; rc = lfsck_links_read(env, obj, &ldata); @@ -6066,82 +6269,6 @@ int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj, } /** - * Remove the name entry from the parent directory. - * - * No need to care about the object referenced by the name entry, - * either the name entry is invalid or redundant, or the referenced - * object has been processed has been or will be handled by others. - * - * \param[in] env pointer to the thread context - * \param[in] lfsck pointer to the lfsck instance - * \param[in] parent pointer to the lost+found object - * \param[in] name the name for the name entry to be removed - * \param[in] type the type for the name entry to be removed - * - * \retval 0 for success - * \retval negative error number on failure - */ -int lfsck_remove_name_entry(const struct lu_env *env, - struct lfsck_instance *lfsck, - struct dt_object *parent, - const char *name, __u32 type) -{ - struct dt_device *dev = lfsck->li_next; - struct thandle *th; - struct lustre_handle lh = { 0 }; - int rc; - ENTRY; - - rc = lfsck_ibits_lock(env, lfsck, parent, &lh, - MDS_INODELOCK_UPDATE, LCK_EX); - if (rc != 0) - RETURN(rc); - - th = dt_trans_create(env, dev); - if (IS_ERR(th)) - GOTO(unlock, rc = PTR_ERR(th)); - - rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th); - if (rc != 0) - GOTO(stop, rc); - - if (S_ISDIR(type)) { - rc = dt_declare_ref_del(env, parent, th); - if (rc != 0) - GOTO(stop, rc); - } - - rc = dt_trans_start(env, dev, th); - if (rc != 0) - GOTO(stop, rc); - - rc = dt_delete(env, parent, (const struct dt_key *)name, th, - BYPASS_CAPA); - if (rc != 0) - GOTO(stop, rc); - - if (S_ISDIR(type)) { - dt_write_lock(env, parent, 0); - rc = dt_ref_del(env, parent, th); - dt_write_unlock(env, parent); - } - - GOTO(stop, rc); - -stop: - dt_trans_stop(env, dev, th); - -unlock: - lfsck_ibits_unlock(&lh, LCK_EX); - - CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s " - "with type %o: rc = %d\n", lfsck_lfsck2name(lfsck), - PFID(lfsck_dto2fid(parent)), name, type, rc); - - return rc; -} - -/** * Update the object's name entry with the given FID. * * \param[in] env pointer to the thread context @@ -6236,6 +6363,7 @@ int lfsck_namespace_setup(const struct lu_env *env, struct lfsck_namespace *ns; struct dt_object *root = NULL; struct dt_object *obj; + int i; int rc; ENTRY; @@ -6254,7 +6382,7 @@ int lfsck_namespace_setup(const struct lu_env *env, com->lc_ops = &lfsck_namespace_ops; com->lc_data = lfsck_assistant_data_init( &lfsck_namespace_assistant_ops, - "lfsck_namespace"); + LFSCK_NAMESPACE); if (com->lc_data == NULL) GOTO(out, rc = -ENOMEM); @@ -6267,6 +6395,9 @@ int lfsck_namespace_setup(const struct lu_env *env, if (com->lc_file_disk == NULL) GOTO(out, rc = -ENOMEM); + for (i = 0; i < LFSCK_STF_COUNT; i++) + mutex_init(&com->lc_sub_trace_objs[i].lsto_mutex); + root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid); if (IS_ERR(root)) GOTO(out, rc = PTR_ERR(root)); @@ -6274,23 +6405,20 @@ int lfsck_namespace_setup(const struct lu_env *env, if (unlikely(!dt_try_as_dir(env, root))) GOTO(out, rc = -ENOTDIR); - obj = local_index_find_or_create(env, lfsck->li_los, root, - lfsck_namespace_name, - S_IFREG | S_IRUGO | S_IWUSR, - &dt_lfsck_features); + obj = local_file_find_or_create(env, lfsck->li_los, root, + LFSCK_NAMESPACE, + S_IFREG | S_IRUGO | S_IWUSR); if (IS_ERR(obj)) GOTO(out, rc = PTR_ERR(obj)); com->lc_obj = obj; - rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features); - if (rc != 0) - GOTO(out, rc); - rc = lfsck_namespace_load(env, com); - if (rc > 0) - rc = lfsck_namespace_reset(env, com, true); - else if (rc == -ENODATA) + if (rc == -ENODATA) rc = lfsck_namespace_init(env, com); + else if (rc < 0) + rc = lfsck_namespace_reset(env, com, true); + else + rc = lfsck_namespace_load_sub_trace_files(env, com, false); if (rc != 0) GOTO(out, rc);