X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flfsck%2Flfsck_namespace.c;h=3f9d3d78a4620c6bb4097a0d89e247cc2099c82e;hp=54b34981935550cb8815d3d6f2c83fef95e4b97c;hb=9ff2d957982160103b5d885c9a532ad45bdf8d4d;hpb=3d1ffca751b2970c3223b0f4fd3573428377c66f diff --git a/lustre/lfsck/lfsck_namespace.c b/lustre/lfsck/lfsck_namespace.c index 54b3498..3f9d3d7 100644 --- a/lustre/lfsck/lfsck_namespace.c +++ b/lustre/lfsck/lfsck_namespace.c @@ -116,7 +116,6 @@ static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst, le64_to_cpu(src->ln_objs_repaired_phase2); dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2); dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired); - dst->ln_objs_lost_found = le64_to_cpu(src->ln_objs_lost_found); fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2, &src->ln_fid_latest_scanned_phase2); dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired); @@ -128,6 +127,11 @@ static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst, dst->ln_unmatched_pairs_repaired = le64_to_cpu(src->ln_unmatched_pairs_repaired); dst->ln_dangling_repaired = le64_to_cpu(src->ln_dangling_repaired); + dst->ln_mul_ref_repaired = le64_to_cpu(src->ln_mul_ref_repaired); + dst->ln_bad_type_repaired = le64_to_cpu(src->ln_bad_type_repaired); + dst->ln_lost_dirent_repaired = + le64_to_cpu(src->ln_lost_dirent_repaired); + dst->ln_bitmap_size = le32_to_cpu(src->ln_bitmap_size); } static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst, @@ -158,7 +162,6 @@ static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst, cpu_to_le64(src->ln_objs_repaired_phase2); dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2); dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired); - dst->ln_objs_lost_found = cpu_to_le64(src->ln_objs_lost_found); fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2, &src->ln_fid_latest_scanned_phase2); dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired); @@ -170,6 +173,11 @@ static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst, dst->ln_unmatched_pairs_repaired = cpu_to_le64(src->ln_unmatched_pairs_repaired); dst->ln_dangling_repaired = cpu_to_le64(src->ln_dangling_repaired); + dst->ln_mul_ref_repaired = cpu_to_le64(src->ln_mul_ref_repaired); + dst->ln_bad_type_repaired = cpu_to_le64(src->ln_bad_type_repaired); + dst->ln_lost_dirent_repaired = + cpu_to_le64(src->ln_lost_dirent_repaired); + dst->ln_bitmap_size = cpu_to_le32(src->ln_bitmap_size); } static void lfsck_namespace_record_failure(const struct lu_env *env, @@ -194,6 +202,74 @@ static void lfsck_namespace_record_failure(const struct lu_env *env, } /** + * Load the MDT bitmap from the lfsck_namespace tracing file. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * + * \retval 0 for success + * \retval negative error number on failure or data corruption + */ +static int lfsck_namespace_load_bitmap(const struct lu_env *env, + struct lfsck_component *com) +{ + struct dt_object *obj = com->lc_obj; + struct lfsck_assistant_data *lad = com->lc_data; + struct lfsck_namespace *ns = com->lc_file_ram; + cfs_bitmap_t *bitmap = lad->lad_bitmap; + ssize_t size; + __u32 nbits; + int rc; + ENTRY; + + if (com->lc_lfsck->li_mdt_descs.ltd_tgts_bitmap->size > + ns->ln_bitmap_size) + nbits = com->lc_lfsck->li_mdt_descs.ltd_tgts_bitmap->size; + else + nbits = ns->ln_bitmap_size; + + if (unlikely(nbits < BITS_PER_LONG)) + nbits = BITS_PER_LONG; + + if (nbits > bitmap->size) { + __u32 new_bits = bitmap->size; + cfs_bitmap_t *new_bitmap; + + while (new_bits < nbits) + new_bits <<= 1; + + new_bitmap = CFS_ALLOCATE_BITMAP(new_bits); + if (new_bitmap == NULL) + RETURN(-ENOMEM); + + lad->lad_bitmap = new_bitmap; + CFS_FREE_BITMAP(bitmap); + bitmap = new_bitmap; + } + + if (ns->ln_bitmap_size == 0) { + lad->lad_incomplete = 0; + CFS_RESET_BITMAP(bitmap); + + RETURN(0); + } + + size = (ns->ln_bitmap_size + 7) >> 3; + rc = dt_xattr_get(env, obj, + lfsck_buf_get(env, bitmap->data, size), + XATTR_NAME_LFSCK_BITMAP, BYPASS_CAPA); + if (rc != size) + RETURN(rc >= 0 ? -EINVAL : rc); + + if (cfs_bitmap_check_empty(bitmap)) + lad->lad_incomplete = 0; + else + lad->lad_incomplete = 1; + + RETURN(0); +} + +/** * \retval +ve: the lfsck_namespace is broken, the caller should reset it. * \retval 0: succeed. * \retval -ve: failed cases. @@ -231,17 +307,30 @@ static int lfsck_namespace_load(const struct lu_env *env, } static int lfsck_namespace_store(const struct lu_env *env, - struct lfsck_component *com, bool init) + struct lfsck_component *com) { - struct dt_object *obj = com->lc_obj; - struct lfsck_instance *lfsck = com->lc_lfsck; - struct thandle *handle; - int len = com->lc_file_size; - int rc; + struct dt_object *obj = com->lc_obj; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_assistant_data *lad = com->lc_data; + cfs_bitmap_t *bitmap = NULL; + struct thandle *handle; + __u32 nbits = 0; + int len = com->lc_file_size; + int rc; ENTRY; + if (lad != NULL) { + bitmap = lad->lad_bitmap; + nbits = bitmap->size; + + LASSERT(nbits > 0); + LASSERTF((nbits & 7) == 0, "Invalid nbits %u\n", nbits); + } + + ns->ln_bitmap_size = nbits; lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk, - (struct lfsck_namespace *)com->lc_file_ram); + ns); handle = dt_trans_create(env, lfsck->li_bottom); if (IS_ERR(handle)) GOTO(log, rc = PTR_ERR(handle)); @@ -252,15 +341,26 @@ static int lfsck_namespace_store(const struct lu_env *env, if (rc != 0) GOTO(out, rc); + if (bitmap != NULL) { + rc = dt_declare_xattr_set(env, obj, + lfsck_buf_get(env, bitmap->data, nbits >> 3), + XATTR_NAME_LFSCK_BITMAP, 0, handle); + if (rc != 0) + GOTO(out, rc); + } + rc = dt_trans_start_local(env, lfsck->li_bottom, handle); if (rc != 0) GOTO(out, rc); rc = dt_xattr_set(env, obj, lfsck_buf_get(env, com->lc_file_disk, len), - XATTR_NAME_LFSCK_NAMESPACE, - init ? LU_XATTR_CREATE : LU_XATTR_REPLACE, - handle, BYPASS_CAPA); + XATTR_NAME_LFSCK_NAMESPACE, 0, handle, BYPASS_CAPA); + if (rc == 0 && bitmap != NULL) + rc = dt_xattr_set(env, obj, + lfsck_buf_get(env, bitmap->data, nbits >> 3), + XATTR_NAME_LFSCK_BITMAP, 0, handle, + BYPASS_CAPA); GOTO(out, rc); @@ -284,7 +384,7 @@ static int lfsck_namespace_init(const struct lu_env *env, ns->ln_magic = LFSCK_NAMESPACE_MAGIC; ns->ln_status = LS_INIT; down_write(&com->lc_sem); - rc = lfsck_namespace_store(env, com, true); + rc = lfsck_namespace_store(env, com); up_write(&com->lc_sem); return rc; } @@ -596,32 +696,697 @@ static int lfsck_namespace_filter_linkea_entry(struct linkea_data *ldata, return repeated; } +/** + * Insert orphan into .lustre/lost+found/MDTxxxx/ locally. + * + * Add the specified orphan MDT-object to the .lustre/lost+found/MDTxxxx/ + * with the given type to generate the name, the detailed rules for name + * have been described as following. + * + * The function also generates the linkEA corresponding to the name entry + * under the .lustre/lost+found/MDTxxxx/ for the orphan MDT-object. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * \param[in] orphan pointer to the orphan MDT-object + * \param[in] infix additional information for the orphan name, such as + * the FID for original + * \param[in] type the type for describing why the orphan MDT-object is + * created. The rules are as following: + * + * type "D": The MDT-object is a directory, it may knows its parent + * but because there is no valid linkEA, the LFSCK cannot + * know where to put it back to the namespace. + * type "O": The MDT-object has no linkEA, and there is no name + * entry that references the MDT-object. + * + * \see lfsck_layout_recreate_parent() for more types. + * + * The orphan name will be like: + * ${FID}-${infix}-${type}-${conflict_version} + * + * \param[out] count if some others inserted some linkEA entries by race, + * then return the linkEA entries count. + * + * \retval positive number for repaired cases + * \retval 0 if needs to repair nothing + * \retval negative error number on failure + */ static int lfsck_namespace_insert_orphan(const struct lu_env *env, struct lfsck_component *com, struct dt_object *orphan, const char *infix, const char *type, int *count) { - /* XXX: TBD */ - return 0; + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lu_name *cname = &info->lti_name; + struct dt_insert_rec *rec = &info->lti_dt_rec; + struct lu_fid *tfid = &info->lti_fid5; + const struct lu_fid *cfid = lfsck_dto2fid(orphan); + const struct lu_fid *pfid; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct dt_device *dev = lfsck->li_bottom; + struct dt_object *parent; + struct thandle *th = NULL; + struct lustre_handle plh = { 0 }; + struct lustre_handle clh = { 0 }; + struct linkea_data ldata = { 0 }; + struct lu_buf linkea_buf; + int namelen; + int idx = 0; + int rc = 0; + bool exist = false; + ENTRY; + + cname->ln_name = NULL; + /* Create .lustre/lost+found/MDTxxxx when needed. */ + if (unlikely(lfsck->li_lpf_obj == NULL)) { + rc = lfsck_create_lpf(env, lfsck); + if (rc != 0) + GOTO(log, rc); + } + + parent = lfsck->li_lpf_obj; + pfid = lfsck_dto2fid(parent); + + /* Hold update lock on the parent to prevent others to access. */ + rc = lfsck_ibits_lock(env, lfsck, parent, &plh, + MDS_INODELOCK_UPDATE, LCK_EX); + if (rc != 0) + GOTO(log, rc); + + do { + namelen = snprintf(info->lti_key, NAME_MAX, DFID"%s-%s-%d", + PFID(cfid), infix, type, idx++); + rc = dt_lookup(env, parent, (struct dt_rec *)tfid, + (const struct dt_key *)info->lti_key, + BYPASS_CAPA); + if (rc != 0 && rc != -ENOENT) + GOTO(log, rc); + + if (unlikely(rc == 0 && lu_fid_eq(cfid, tfid))) + exist = true; + } while (rc == 0 && !exist); + + cname->ln_name = info->lti_key; + cname->ln_namelen = namelen; + rc = linkea_data_new(&ldata, &info->lti_linkea_buf2); + if (rc != 0) + GOTO(log, rc); + + rc = linkea_add_buf(&ldata, cname, pfid); + if (rc != 0) + GOTO(log, rc); + + rc = lfsck_ibits_lock(env, lfsck, orphan, &clh, + MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP, + LCK_EX); + if (rc != 0) + GOTO(log, rc); + + lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf, + ldata.ld_leh->leh_len); + th = dt_trans_create(env, dev); + if (IS_ERR(th)) + GOTO(log, rc = PTR_ERR(th)); + + if (S_ISDIR(lfsck_object_type(orphan))) { + rc = dt_declare_delete(env, orphan, + (const struct dt_key *)dotdot, th); + if (rc != 0) + GOTO(stop, rc); + + rec->rec_type = S_IFDIR; + rec->rec_fid = pfid; + rc = dt_declare_insert(env, orphan, (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, th); + if (rc != 0) + GOTO(stop, rc); + } + + rc = dt_declare_xattr_set(env, orphan, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc != 0) + GOTO(stop, rc); + + if (!exist) { + rec->rec_type = lfsck_object_type(orphan) & S_IFMT; + rec->rec_fid = cfid; + rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec, + (const struct dt_key *)cname->ln_name, + th); + if (rc != 0) + GOTO(stop, rc); + + if (S_ISDIR(rec->rec_type)) { + rc = dt_declare_ref_add(env, parent, th); + if (rc != 0) + GOTO(stop, rc); + } + } + + rc = dt_trans_start_local(env, dev, th); + if (rc != 0) + GOTO(stop, rc); + + dt_write_lock(env, orphan, 0); + rc = lfsck_links_read(env, orphan, &ldata); + if (likely((rc == -ENODATA) || (rc == -EINVAL) || + (rc == 0 && ldata.ld_leh->leh_reccount == 0))) { + if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) + GOTO(unlock, rc = 1); + + if (S_ISDIR(lfsck_object_type(orphan))) { + rc = dt_delete(env, orphan, + (const struct dt_key *)dotdot, th, + BYPASS_CAPA); + if (rc != 0) + GOTO(unlock, rc); + + rec->rec_type = S_IFDIR; + rec->rec_fid = pfid; + rc = dt_insert(env, orphan, (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, th, + BYPASS_CAPA, 1); + if (rc != 0) + GOTO(unlock, rc); + } + + rc = dt_xattr_set(env, orphan, &linkea_buf, XATTR_NAME_LINK, 0, + th, BYPASS_CAPA); + } else { + if (rc == 0 && count != NULL) + *count = ldata.ld_leh->leh_reccount; + + GOTO(unlock, rc); + } + dt_write_unlock(env, orphan); + + if (rc == 0 && !exist) { + rec->rec_type = lfsck_object_type(orphan) & S_IFMT; + rec->rec_fid = cfid; + rc = dt_insert(env, parent, (const struct dt_rec *)rec, + (const struct dt_key *)cname->ln_name, + th, BYPASS_CAPA, 1); + if (rc == 0 && S_ISDIR(rec->rec_type)) { + dt_write_lock(env, parent, 0); + rc = dt_ref_add(env, parent, th); + dt_write_unlock(env, parent); + } + } + + GOTO(stop, rc = (rc == 0 ? 1 : rc)); + +unlock: + dt_write_unlock(env, orphan); + +stop: + dt_trans_stop(env, dev, th); + +log: + lfsck_ibits_unlock(&clh, LCK_EX); + lfsck_ibits_unlock(&plh, LCK_EX); + CDEBUG(D_LFSCK, "%s: namespace LFSCK insert orphan for the " + "object "DFID", name = %s: rc = %d\n", + lfsck_lfsck2name(lfsck), PFID(cfid), + cname->ln_name != NULL ? cname->ln_name : "", rc); + + if (rc != 0) { + struct lfsck_namespace *ns = com->lc_file_ram; + + ns->ln_flags |= LF_INCONSISTENT; + } + + return rc; } +/** + * Add the specified name entry back to namespace. + * + * If there is a linkEA entry that back references a name entry under + * some parent directory, but such parent directory does not have the + * claimed name entry. On the other hand, the linkEA entries count is + * not larger than the MDT-object's hard link count. Under such case, + * it is quite possible that the name entry is lost. Then the LFSCK + * should add the name entry back to the namespace. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * \param[in] parent pointer to the directory under which the name entry + * will be inserted into + * \param[in] child pointer to the object referenced by the name entry + * that to be inserted into the parent + * \param[in] name the name for the child in the parent directory + * + * \retval positive number for repaired cases + * \retval 0 if nothing to be repaired + * \retval negative error number on failure + */ static int lfsck_namespace_insert_normal(const struct lu_env *env, struct lfsck_component *com, struct dt_object *parent, struct dt_object *child, const char *name) { - /* XXX: TBD */ - return 0; + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lu_attr *la = &info->lti_la; + struct dt_insert_rec *rec = &info->lti_dt_rec; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct dt_device *dev = lfsck->li_next; + struct thandle *th = NULL; + struct lustre_handle lh = { 0 }; + int rc = 0; + ENTRY; + + if (unlikely(!dt_try_as_dir(env, parent))) + GOTO(log, rc = -ENOTDIR); + + if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) + GOTO(log, rc = 1); + + /* Hold update lock on the parent to prevent others to access. */ + rc = lfsck_ibits_lock(env, lfsck, parent, &lh, + MDS_INODELOCK_UPDATE, LCK_EX); + if (rc != 0) + GOTO(log, rc); + + th = dt_trans_create(env, dev); + if (IS_ERR(th)) + GOTO(unlock, rc = PTR_ERR(th)); + + rec->rec_type = lfsck_object_type(child) & S_IFMT; + rec->rec_fid = lfsck_dto2fid(child); + rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec, + (const struct dt_key *)name, th); + if (rc != 0) + GOTO(stop, rc); + + if (S_ISDIR(rec->rec_type)) { + rc = dt_declare_ref_add(env, parent, th); + if (rc != 0) + GOTO(stop, rc); + } + + memset(la, 0, sizeof(*la)); + la->la_ctime = cfs_time_current_sec(); + la->la_valid = LA_CTIME; + rc = dt_declare_attr_set(env, parent, la, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, dev, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_insert(env, parent, (const struct dt_rec *)rec, + (const struct dt_key *)name, th, BYPASS_CAPA, 1); + if (rc != 0) + GOTO(stop, rc); + + if (S_ISDIR(rec->rec_type)) { + dt_write_lock(env, parent, 0); + rc = dt_ref_add(env, parent, th); + dt_write_unlock(env, parent); + if (rc != 0) + GOTO(stop, rc); + } + + la->la_ctime = cfs_time_current_sec(); + rc = dt_attr_set(env, parent, la, th, BYPASS_CAPA); + + GOTO(stop, rc = (rc == 0 ? 1 : rc)); + +stop: + dt_trans_stop(env, dev, th); + +unlock: + lfsck_ibits_unlock(&lh, LCK_EX); + +log: + CDEBUG(D_LFSCK, "%s: namespace LFSCK insert object "DFID" with " + "the name %s and type %o to the parent "DFID": rc = %d\n", + lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), name, + lfsck_object_type(child) & S_IFMT, + PFID(lfsck_dto2fid(parent)), rc); + + if (rc != 0) { + struct lfsck_namespace *ns = com->lc_file_ram; + + ns->ln_flags |= LF_INCONSISTENT; + if (rc > 0) + ns->ln_lost_dirent_repaired++; + } + + return rc; } +/** + * Create the specified orphan MDT-object on remote MDT. + * + * The LFSCK instance on this MDT will send LFSCK RPC to remote MDT to + * ask the remote LFSCK instance to create the specified orphan object + * under .lustre/lost+found/MDTxxxx/ directory with the name: + * ${FID}-P-${conflict_version}. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * \param[in] orphan pointer to the orphan MDT-object + * \param[in] type the orphan's type to be created + * + * type "P": The orphan object to be created was a parent directory + * of some MDT-object which linkEA shows that the @orphan + * object is missing. + * + * \see lfsck_layout_recreate_parent() for more types. + * + * \retval positive number for repaired cases + * \retval 0 if needs to repair nothing + * \retval negative error number on failure + */ +static int lfsck_namespace_create_orphan_remote(const struct lu_env *env, + struct lfsck_component *com, + struct dt_object *orphan, + __u32 type) +{ + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lfsck_request *lr = &info->lti_lr; + struct lu_seq_range *range = &info->lti_range; + const struct lu_fid *fid = lfsck_dto2fid(orphan); + struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct seq_server_site *ss = + lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site); + struct lfsck_tgt_desc *ltd = NULL; + struct ptlrpc_request *req = NULL; + int rc; + ENTRY; + + if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) + GOTO(out, rc = 1); + + fld_range_set_mdt(range); + rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range); + if (rc != 0) + GOTO(out, rc); + + ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, range->lsr_index); + if (ltd == NULL) { + ns->ln_flags |= LF_INCOMPLETE; + + GOTO(out, rc = -ENODEV); + } + + req = ptlrpc_request_alloc(class_exp2cliimp(ltd->ltd_exp), + &RQF_LFSCK_NOTIFY); + if (req == NULL) + GOTO(out, rc = -ENOMEM); + + rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY); + if (rc != 0) { + ptlrpc_request_free(req); + + GOTO(out, rc); + } + + lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST); + memset(lr, 0, sizeof(*lr)); + lr->lr_event = LE_CREATE_ORPHAN; + lr->lr_index = lfsck_dev_idx(lfsck->li_bottom); + lr->lr_active = LFSCK_TYPE_NAMESPACE; + lr->lr_fid = *fid; + lr->lr_type = type; + + ptlrpc_request_set_replen(req); + rc = ptlrpc_queue_wait(req); + ptlrpc_req_finished(req); + + if (rc == 0) + rc = 1; + else if (rc == -EEXIST) + rc = 0; + + GOTO(out, rc); + +out: + CDEBUG(D_LFSCK, "%s: namespace LFSCK create object " + DFID" on the MDT %x remotely: rc = %d\n", + lfsck_lfsck2name(lfsck), PFID(fid), + ltd != NULL ? ltd->ltd_index : -1, rc); + + if (ltd != NULL) + lfsck_tgt_put(ltd); + + return rc; +} + +/** + * Create the specified orphan MDT-object locally. + * + * For the case that the parent MDT-object stored in some MDT-object's + * linkEA entry is lost, the LFSCK will re-create the parent object as + * an orphan and insert it into .lustre/lost+found/MDTxxxx/ directory + * with the name ${FID}-P-${conflict_version}. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * \param[in] orphan pointer to the orphan MDT-object to be created + * \param[in] type the orphan's type to be created + * + * type "P": The orphan object to be created was a parent directory + * of some MDT-object which linkEA shows that the @orphan + * object is missing. + * + * \see lfsck_layout_recreate_parent() for more types. + * + * \retval positive number for repaired cases + * \retval negative error number on failure + */ +static int lfsck_namespace_create_orphan_local(const struct lu_env *env, + struct lfsck_component *com, + struct dt_object *orphan, + __u32 type) +{ + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lu_attr *la = &info->lti_la; + struct dt_allocation_hint *hint = &info->lti_hint; + struct dt_object_format *dof = &info->lti_dof; + struct lu_name *cname = &info->lti_name2; + struct dt_insert_rec *rec = &info->lti_dt_rec; + struct lu_fid *tfid = &info->lti_fid; + const struct lu_fid *cfid = lfsck_dto2fid(orphan); + const struct lu_fid *pfid; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct dt_device *dev = lfsck->li_bottom; + struct dt_object *parent = NULL; + struct dt_object *child = NULL; + struct thandle *th = NULL; + struct lustre_handle lh = { 0 }; + struct linkea_data ldata = { 0 }; + struct lu_buf linkea_buf; + char name[32]; + int namelen; + int idx = 0; + int rc = 0; + ENTRY; + + LASSERT(!dt_object_exists(orphan)); + LASSERT(!dt_object_remote(orphan)); + + /* @orphan maybe not attached to lfsck->li_bottom */ + child = lfsck_object_find_by_dev(env, dev, cfid); + if (IS_ERR(child)) + GOTO(log, rc = PTR_ERR(child)); + + cname->ln_name = NULL; + if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) + GOTO(log, rc = 1); + + /* Create .lustre/lost+found/MDTxxxx when needed. */ + if (unlikely(lfsck->li_lpf_obj == NULL)) { + rc = lfsck_create_lpf(env, lfsck); + if (rc != 0) + GOTO(log, rc); + } + + parent = lfsck->li_lpf_obj; + pfid = lfsck_dto2fid(parent); + + /* Hold update lock on the parent to prevent others to access. */ + rc = lfsck_ibits_lock(env, lfsck, parent, &lh, + MDS_INODELOCK_UPDATE, LCK_EX); + if (rc != 0) + GOTO(log, rc); + + do { + namelen = snprintf(name, 31, DFID"-P-%d", + PFID(cfid), idx++); + rc = dt_lookup(env, parent, (struct dt_rec *)tfid, + (const struct dt_key *)name, BYPASS_CAPA); + if (rc != 0 && rc != -ENOENT) + GOTO(unlock1, rc); + } while (rc == 0); + + cname->ln_name = name; + cname->ln_namelen = namelen; + + memset(la, 0, sizeof(*la)); + la->la_mode = type | (S_ISDIR(type) ? 0700 : 0600); + la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID | + LA_ATIME | LA_MTIME | LA_CTIME; + + child->do_ops->do_ah_init(env, hint, parent, child, + la->la_mode & S_IFMT); + + memset(dof, 0, sizeof(*dof)); + dof->dof_type = dt_mode_to_dft(type); + + rc = linkea_data_new(&ldata, &info->lti_linkea_buf2); + if (rc != 0) + GOTO(unlock1, rc); + + rc = linkea_add_buf(&ldata, cname, pfid); + if (rc != 0) + GOTO(unlock1, rc); + + th = dt_trans_create(env, dev); + if (IS_ERR(th)) + GOTO(unlock1, rc = PTR_ERR(th)); + + rc = dt_declare_create(env, child, la, hint, dof, th); + if (rc == 0 && S_ISDIR(type)) + rc = dt_declare_ref_add(env, child, th); + + if (rc != 0) + GOTO(stop, rc); + + lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf, + ldata.ld_leh->leh_len); + rc = dt_declare_xattr_set(env, child, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc != 0) + GOTO(stop, rc); + + rec->rec_type = type; + rec->rec_fid = cfid; + rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec, + (const struct dt_key *)name, th); + if (rc == 0 && S_ISDIR(type)) + rc = dt_declare_ref_add(env, parent, th); + + if (rc != 0) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, dev, th); + if (rc != 0) + GOTO(stop, rc); + + dt_write_lock(env, child, 0); + rc = dt_create(env, child, la, hint, dof, th); + if (rc != 0) + GOTO(unlock2, rc); + + if (S_ISDIR(type)) { + if (unlikely(!dt_try_as_dir(env, child))) + GOTO(unlock2, rc = -ENOTDIR); + + rec->rec_type = S_IFDIR; + rec->rec_fid = cfid; + rc = dt_insert(env, child, (const struct dt_rec *)rec, + (const struct dt_key *)dot, th, BYPASS_CAPA, 1); + if (rc != 0) + GOTO(unlock2, rc); + + rec->rec_fid = pfid; + rc = dt_insert(env, child, (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, th, + BYPASS_CAPA, 1); + if (rc != 0) + GOTO(unlock2, rc); + + rc = dt_ref_add(env, child, th); + if (rc != 0) + GOTO(unlock2, rc); + } + + rc = dt_xattr_set(env, child, &linkea_buf, + XATTR_NAME_LINK, 0, th, BYPASS_CAPA); + dt_write_unlock(env, child); + if (rc != 0) + GOTO(stop, rc); + + rec->rec_type = type; + rec->rec_fid = cfid; + rc = dt_insert(env, parent, (const struct dt_rec *)rec, + (const struct dt_key *)name, th, BYPASS_CAPA, 1); + if (rc == 0 && S_ISDIR(type)) { + dt_write_lock(env, parent, 0); + rc = dt_ref_add(env, parent, th); + dt_write_unlock(env, parent); + } + + GOTO(stop, rc = (rc == 0 ? 1 : rc)); + +unlock2: + dt_write_unlock(env, child); + +stop: + dt_trans_stop(env, dev, th); + +unlock1: + lfsck_ibits_unlock(&lh, LCK_EX); + +log: + CDEBUG(D_LFSCK, "%s: namespace LFSCK create orphan locally for " + "the object "DFID", name = %s, type %o: rc = %d\n", + lfsck_lfsck2name(lfsck), PFID(cfid), + cname->ln_name != NULL ? cname->ln_name : "", type, rc); + + if (child != NULL && !IS_ERR(child)) + lfsck_object_put(env, child); + + return rc; +} + +/** + * Create the specified orphan MDT-object. + * + * For the case that the parent MDT-object stored in some MDT-object's + * linkEA entry is lost, the LFSCK will re-create the parent object as + * an orphan and insert it into .lustre/lost+found/MDTxxxx/ directory + * with the name: ${FID}-P-${conflict_version}. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * \param[in] orphan pointer to the orphan MDT-object + * + * type "P": The orphan object to be created was a parent directory + * of some MDT-object which linkEA shows that the @orphan + * object is missing. + * + * \see lfsck_layout_recreate_parent() for more types. + * + * \retval positive number for repaired cases + * \retval 0 if needs to repair nothing + * \retval negative error number on failure + */ static int lfsck_namespace_create_orphan(const struct lu_env *env, struct lfsck_component *com, struct dt_object *orphan) { - /* XXX: TBD */ - return 0; + struct lfsck_namespace *ns = com->lc_file_ram; + int rc; + + if (dt_object_remote(orphan)) + rc = lfsck_namespace_create_orphan_remote(env, com, orphan, + S_IFDIR); + else + rc = lfsck_namespace_create_orphan_local(env, com, orphan, + S_IFDIR); + + if (rc != 0) + ns->ln_flags |= LF_INCONSISTENT; + + return rc; } /** @@ -866,7 +1631,7 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env, const struct lu_name *cname) { struct lfsck_thread_info *info = lfsck_env_info(env); - struct lu_fid *tfid = &info->lti_fid4; + struct lu_fid *tfid = &info->lti_fid5; struct lu_attr *la = &info->lti_la; struct dt_insert_rec *rec = &info->lti_dt_rec; struct lfsck_instance *lfsck = com->lc_lfsck; @@ -1106,6 +1871,155 @@ log: } /** + * Repair invalid name entry. + * + * If the name entry contains invalid information, such as bad file type + * or (and) corrupted object FID, then either remove the name entry or + * udpate the name entry with the given (right) information. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * \param[in] parent pointer to the parent directory + * \param[in] child pointer to the object referenced by the name entry + * \param[in] name the old name of the child under the parent directory + * \param[in] name2 the new name of the child under the parent directory + * \param[in] type the type claimed by the name entry + * \param[in] update update the name entry if true; otherwise, remove it + * \param[in] dec decrease the parent nlink count if true + * + * \retval positive number for repaired successfully + * \retval 0 if nothing to be repaired + * \retval negative error number on failure + */ +int lfsck_namespace_repair_dirent(const struct lu_env *env, + struct lfsck_component *com, + struct dt_object *parent, + struct dt_object *child, + const char *name, const char *name2, + __u16 type, bool update, bool dec) +{ + struct lfsck_thread_info *info = lfsck_env_info(env); + struct dt_insert_rec *rec = &info->lti_dt_rec; + const struct lu_fid *cfid = lfsck_dto2fid(child); + struct lu_fid *tfid = &info->lti_fid5; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct dt_device *dev = lfsck->li_next; + struct thandle *th = NULL; + struct lustre_handle lh = { 0 }; + int rc = 0; + ENTRY; + + if (unlikely(!dt_try_as_dir(env, parent))) + GOTO(log, rc = -ENOTDIR); + + rc = lfsck_ibits_lock(env, lfsck, parent, &lh, + MDS_INODELOCK_UPDATE, LCK_EX); + if (rc != 0) + GOTO(log, rc); + + th = dt_trans_create(env, dev); + if (IS_ERR(th)) + GOTO(unlock1, rc = PTR_ERR(th)); + + rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th); + if (rc != 0) + GOTO(stop, rc); + + if (update) { + rec->rec_type = lfsck_object_type(child) & S_IFMT; + rec->rec_fid = cfid; + rc = dt_declare_insert(env, parent, + (const struct dt_rec *)rec, + (const struct dt_key *)name2, th); + if (rc != 0) + GOTO(stop, rc); + } + + if (dec) { + rc = dt_declare_ref_del(env, parent, th); + if (rc != 0) + GOTO(stop, rc); + } + + rc = dt_trans_start(env, dev, th); + if (rc != 0) + GOTO(stop, rc); + + dt_write_lock(env, parent, 0); + rc = dt_lookup(env, parent, (struct dt_rec *)tfid, + (const struct dt_key *)name, BYPASS_CAPA); + /* Someone has removed the bad name entry by race. */ + if (rc == -ENOENT) + GOTO(unlock2, rc = 0); + + if (rc != 0) + GOTO(unlock2, rc); + + /* Someone has removed the bad name entry and reused it for other + * object by race. */ + if (!lu_fid_eq(tfid, cfid)) + GOTO(unlock2, rc = 0); + + if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) + GOTO(unlock2, rc = 1); + + rc = dt_delete(env, parent, (const struct dt_key *)name, th, + BYPASS_CAPA); + if (rc != 0) + GOTO(unlock2, rc); + + if (update) { + rc = dt_insert(env, parent, + (const struct dt_rec *)rec, + (const struct dt_key *)name2, th, + BYPASS_CAPA, 1); + if (rc != 0) + GOTO(unlock2, rc); + } + + if (dec) { + rc = dt_ref_del(env, parent, th); + if (rc != 0) + GOTO(unlock2, rc); + } + + GOTO(unlock2, rc = (rc == 0 ? 1 : rc)); + +unlock2: + dt_write_unlock(env, parent); + +stop: + dt_trans_stop(env, dev, th); + + /* We are not sure whether the child will become orphan or not. + * Record it in the LFSCK tracing file for further checking in + * the second-stage scanning. */ + if (!update && !dec && rc == 0) + lfsck_namespace_trace_update(env, com, cfid, + LNTF_CHECK_LINKEA, true); + +unlock1: + lfsck_ibits_unlock(&lh, LCK_EX); + +log: + CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found bad name " + "entry for: parent "DFID", child "DFID", name %s, type " + "in name entry %o, type claimed by child %o. repair it " + "by %s with new name2 %s: rc = %d\n", lfsck_lfsck2name(lfsck), + PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)), + name, type, update ? lfsck_object_type(child) : 0, + update ? "updating" : "removing", name2, rc); + + if (rc != 0) { + struct lfsck_namespace *ns = com->lc_file_ram; + + ns->ln_flags |= LF_INCONSISTENT; + } + + return rc; +} + +/** * Update the ".." name entry for the given object. * * The object's ".." is corrupted, this function will update the ".." name @@ -1233,18 +2147,22 @@ log: * \param[in] obj pointer to the orphan object to be handled * \param[in] pfid the new fid for the object's ".." name entry * \param[in,out] lh ldlm lock handler for the given @obj + * \param[out] type to tell the caller what the inconsistency is * * \retval positive number for repaired cases * \retval 0 if nothing to be repaired * \retval negative error number on failure */ -static int lfsck_namespace_dsd_orphan(const struct lu_env *env, - struct lfsck_component *com, - struct dt_object *obj, - const struct lu_fid *pfid, - struct lustre_handle *lh) +static int +lfsck_namespace_dsd_orphan(const struct lu_env *env, + struct lfsck_component *com, + struct dt_object *obj, + const struct lu_fid *pfid, + struct lustre_handle *lh, + enum lfsck_namespace_inconsistency_type *type) { struct lfsck_thread_info *info = lfsck_env_info(env); + struct lfsck_namespace *ns = com->lc_file_ram; int rc; ENTRY; @@ -1254,6 +2172,18 @@ static int lfsck_namespace_dsd_orphan(const struct lu_env *env, if (rc < 0 && rc != -ENODATA) RETURN(rc); + *type = LNIT_MUL_REF; + + /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has + * ever tried to verify some remote MDT-object that resides on this + * MDT, but this MDT failed to respond such request. So means there + * may be some remote name entry on other MDT that references this + * object with another name, so we cannot know whether this linkEA + * is valid or not. So keep it there and maybe resolved when next + * LFSCK run. */ + if (ns->ln_flags & LF_INCOMPLETE) + RETURN(0); + /* The unique linkEA is invalid, even if the ".." name entry may be * valid, we still cannot know via which name entry this directory * will be referenced. Then handle it as pure orphan. */ @@ -1302,6 +2232,7 @@ lfsck_namespace_dsd_single(const struct lu_env *env, struct lu_name *cname = &info->lti_name; const struct lu_fid *cfid = lfsck_dto2fid(child); struct lu_fid *tfid = &info->lti_fid3; + struct lfsck_namespace *ns = com->lc_file_ram; struct lfsck_instance *lfsck = com->lc_lfsck; struct dt_object *parent = NULL; int rc = 0; @@ -1314,7 +2245,7 @@ lfsck_namespace_dsd_single(const struct lu_env *env, *retry = true; else rc = lfsck_namespace_dsd_orphan(env, com, child, - pfid, lh); + pfid, lh, type); GOTO(out, rc); } @@ -1329,6 +2260,16 @@ lfsck_namespace_dsd_single(const struct lu_env *env, * name entry the child will be referenced, since all known entries * have been verified during the first-stage scanning. */ if (!dt_object_exists(parent)) { + /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT + * has ever tried to verify some remote MDT-object that resides + * on this MDT, but this MDT failed to respond such request. So + * means there may be some remote name entry on other MDT that + * references this object with another name, so we cannot know + * whether this linkEA is valid or not. So keep it there and + * maybe resolved when next LFSCK run. */ + if (ns->ln_flags & LF_INCOMPLETE) + GOTO(out, rc = 0); + if (!lustre_handle_is_used(lh) && retry != NULL) { *retry = true; @@ -1336,12 +2277,36 @@ lfsck_namespace_dsd_single(const struct lu_env *env, } lfsck_ibits_unlock(lh, LCK_EX); + +lost_parent: /* Create the lost parent as an orphan. */ rc = lfsck_namespace_create_orphan(env, com, parent); - if (rc >= 0) + if (rc >= 0) { /* Add the missing name entry to the parent. */ rc = lfsck_namespace_insert_normal(env, com, parent, child, cname->ln_name); + if (unlikely(rc == -EEXIST)) { + /* Unfortunately, someone reused the name + * under the parent by race. So we have + * to remove the linkEA entry from + * current child object. It means that the + * LFSCK cannot recover the system + * totally back to its original status, + * but it is necessary to make the + * current system to be consistent. */ + rc = lfsck_namespace_shrink_linkea(env, + com, child, ldata, + cname, tfid, true); + if (rc >= 0) { + snprintf(info->lti_tmpbuf, + sizeof(info->lti_tmpbuf), + "-"DFID, PFID(pfid)); + rc = lfsck_namespace_insert_orphan(env, + com, child, info->lti_tmpbuf, + "D", NULL); + } + } + } GOTO(out, rc); } @@ -1352,7 +2317,7 @@ lfsck_namespace_dsd_single(const struct lu_env *env, *retry = true; else rc = lfsck_namespace_dsd_orphan(env, com, child, - pfid, lh); + pfid, lh, type); GOTO(out, rc); } @@ -1360,6 +2325,16 @@ lfsck_namespace_dsd_single(const struct lu_env *env, rc = dt_lookup(env, parent, (struct dt_rec *)tfid, (const struct dt_key *)cname->ln_name, BYPASS_CAPA); if (rc == -ENOENT) { + /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT + * has ever tried to verify some remote MDT-object that resides + * on this MDT, but this MDT failed to respond such request. So + * means there may be some remote name entry on other MDT that + * references this object with another name, so we cannot know + * whether this linkEA is valid or not. So keep it there and + * maybe resolved when next LFSCK run. */ + if (ns->ln_flags & LF_INCOMPLETE) + GOTO(out, rc = 0); + if (!lustre_handle_is_used(lh) && retry != NULL) { *retry = true; @@ -1370,6 +2345,32 @@ lfsck_namespace_dsd_single(const struct lu_env *env, /* Add the missing name entry back to the namespace. */ rc = lfsck_namespace_insert_normal(env, com, parent, child, cname->ln_name); + if (unlikely(rc == -ESTALE)) + /* It may happen when the remote object has been + * removed, but the local MDT is not aware of that. */ + goto lost_parent; + + if (unlikely(rc == -EEXIST)) { + /* Unfortunately, someone reused the name under the + * parent by race. So we have to remove the linkEA + * entry from current child object. It means that the + * LFSCK cannot recover the system totally back to + * its original status, but it is necessary to make + * the current system to be consistent. + * + * It also may be because of the LFSCK found some + * internal status of create operation. Under such + * case, nothing to be done. */ + rc = lfsck_namespace_shrink_linkea_cond(env, com, + parent, child, ldata, cname, tfid); + if (rc >= 0) { + snprintf(info->lti_tmpbuf, + sizeof(info->lti_tmpbuf), + "-"DFID, PFID(pfid)); + rc = lfsck_namespace_insert_orphan(env, com, + child, info->lti_tmpbuf, "D", NULL); + } + } GOTO(out, rc); } @@ -1392,7 +2393,7 @@ lfsck_namespace_dsd_single(const struct lu_env *env, tfid, cname); if (rc == 0) rc = lfsck_namespace_dsd_orphan(env, com, child, - pfid, lh); + pfid, lh, type); GOTO(out, rc); } @@ -1457,9 +2458,12 @@ lfsck_namespace_dsd_multiple(const struct lu_env *env, const struct lu_fid *cfid = lfsck_dto2fid(child); struct lu_fid *tfid = &info->lti_fid3; struct lu_fid *pfid2 = &info->lti_fid4; + struct lfsck_namespace *ns = com->lc_file_ram; struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; struct dt_object *parent = NULL; struct linkea_data ldata_new = { 0 }; + int count = 0; int rc = 0; bool once = true; ENTRY; @@ -1559,61 +2563,231 @@ rebuild: rc = lfsck_namespace_rebuild_linkea(env, com, child, &ldata_new); + if (rc < 0) + RETURN(rc); + + linkea_del_buf(ldata, cname); + linkea_first_entry(ldata); + /* There may be some invalid dangling name entries under + * other parent directories, remove all of them. */ + while (ldata->ld_lee != NULL) { + lfsck_namespace_unpack_linkea_entry(ldata, + cname, tfid, info->lti_key); + if (!fid_is_sane(tfid)) + goto next; + + parent = lfsck_object_find_bottom(env, lfsck, + tfid); + if (IS_ERR(parent)) { + rc = PTR_ERR(parent); + if (rc != -ENOENT && + bk->lb_param & LPF_FAILOUT) + RETURN(rc); + + goto next; + } + + if (!dt_object_exists(parent)) { + lfsck_object_put(env, parent); + goto next; + } + + rc = lfsck_namespace_repair_dirent(env, com, + parent, child, cname->ln_name, + cname->ln_name, S_IFDIR, false, true); + lfsck_object_put(env, parent); + if (rc < 0) { + if (bk->lb_param & LPF_FAILOUT) + RETURN(rc); + + goto next; + } + + count += rc; + +next: + linkea_del_buf(ldata, cname); + } + + ns->ln_dirent_repaired += count; + + RETURN(rc); + } + + lfsck_ibits_unlock(lh, LCK_EX); + /* The name entry references another MDT-object that may be + * created by the LFSCK for repairing dangling name entry. + * Try to replace it. */ + rc = lfsck_namespace_replace_cond(env, com, parent, child, + tfid, cname); + lfsck_object_put(env, parent); + if (rc < 0) + RETURN(rc); + + if (rc > 0) + goto rebuild; + + linkea_del_buf(ldata, cname); + } + + if (ldata->ld_leh->leh_reccount == 1) { + rc = lfsck_namespace_dsd_single(env, com, child, pfid, ldata, + lh, type, NULL); + + RETURN(rc); + } + + /* All linkEA entries are invalid and removed, then handle the @child + * as an orphan.*/ + if (ldata->ld_leh->leh_reccount == 0) { + rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, lh, + type); + + RETURN(rc); + } + + linkea_first_entry(ldata); + /* If the dangling name entry for the orphan directory object has + * been remvoed, then just check whether the directory object is + * still under the .lustre/lost+found/MDTxxxx/ or not. */ + if (lpf) { + lpf = false; + goto again; + } + + /* There is no linkEA entry that matches the ".." name entry. Find + * the first linkEA entry that both parent and name entry exist to + * rebuild a new ".." name entry. */ + if (once) { + once = false; + goto again; + } + + RETURN(rc); +} + +/** + * Repair the object's nlink attribute. + * + * If all the known name entries have been verified, then the object's hard + * link attribute should match the object's linkEA entries count unless the + * object's has too much hard link to be recorded in the linkEA. Such cases + * should have been marked in the LFSCK tracing file. Otherwise, trust the + * linkEA to update the object's nlink attribute. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * \param[in] obj pointer to the dt_object to be handled + * \param[in,out] nlink pointer to buffer to object's hard lock count before + * and after the repairing + * + * \retval positive number for repaired cases + * \retval 0 if nothing to be repaired + * \retval negative error number on failure + */ +static int lfsck_namespace_repair_nlink(const struct lu_env *env, + struct lfsck_component *com, + struct dt_object *obj, __u32 *nlink) +{ + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lu_attr *la = &info->lti_la3; + struct lu_fid *tfid = &info->lti_fid3; + struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct dt_device *dev = lfsck->li_bottom; + const struct lu_fid *cfid = lfsck_dto2fid(obj); + struct dt_object *child = NULL; + struct thandle *th = NULL; + struct linkea_data ldata = { 0 }; + struct lustre_handle lh = { 0 }; + __u32 old = *nlink; + int rc = 0; + __u8 flags; + ENTRY; + + LASSERT(!dt_object_remote(obj)); + LASSERT(S_ISREG(lfsck_object_type(obj))); + + child = lfsck_object_find_by_dev(env, dev, cfid); + if (IS_ERR(child)) + GOTO(log, rc = PTR_ERR(child)); + + rc = lfsck_ibits_lock(env, lfsck, child, &lh, + MDS_INODELOCK_UPDATE | + MDS_INODELOCK_XATTR, LCK_EX); + if (rc != 0) + GOTO(log, rc); + + th = dt_trans_create(env, dev); + if (IS_ERR(th)) + GOTO(log, rc = PTR_ERR(th)); + + la->la_valid = LA_NLINK; + rc = dt_declare_attr_set(env, child, la, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, dev, th); + if (rc != 0) + GOTO(stop, rc); + + dt_write_lock(env, child, 0); + /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has + * ever tried to verify some remote MDT-object that resides on this + * MDT, but this MDT failed to respond such request. So means there + * may be some remote name entry on other MDT that references this + * object with another name, so we cannot know whether this linkEA + * is valid or not. So keep it there and maybe resolved when next + * LFSCK run. */ + if (ns->ln_flags & LF_INCOMPLETE) + GOTO(unlock, rc = 0); + + fid_cpu_to_be(tfid, cfid); + rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)&flags, + (const struct dt_key *)tfid, BYPASS_CAPA); + if (rc != 0) + GOTO(unlock, rc); - /* XXX: there will be other patch. */ + if (flags & LNTF_SKIP_NLINK) + GOTO(unlock, rc = 0); - RETURN(rc); - } + rc = lfsck_links_read2(env, child, &ldata); + if (rc == -ENODATA) + GOTO(unlock, rc = 0); - lfsck_ibits_unlock(lh, LCK_EX); - /* The name entry references another MDT-object that may be - * created by the LFSCK for repairing dangling name entry. - * Try to replace it. */ - rc = lfsck_namespace_replace_cond(env, com, parent, child, - tfid, cname); - lfsck_object_put(env, parent); - if (rc < 0) - RETURN(rc); + if (rc != 0) + GOTO(unlock, rc); - if (rc > 0) - goto rebuild; + if (*nlink == ldata.ld_leh->leh_reccount) + GOTO(unlock, rc = 0); - linkea_del_buf(ldata, cname); - } + la->la_nlink = *nlink = ldata.ld_leh->leh_reccount; + if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) + GOTO(unlock, rc = 1); - if (ldata->ld_leh->leh_reccount == 1) { - rc = lfsck_namespace_dsd_single(env, com, child, pfid, ldata, - lh, type, NULL); + rc = dt_attr_set(env, child, la, th, BYPASS_CAPA); - RETURN(rc); - } + GOTO(unlock, rc = (rc == 0 ? 1 : rc)); - /* All linkEA entries are invalid and removed, then handle the @child - * as an orphan.*/ - if (ldata->ld_leh->leh_reccount == 0) { - rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, lh); +unlock: + dt_write_unlock(env, child); - RETURN(rc); - } +stop: + dt_trans_stop(env, dev, th); - linkea_first_entry(ldata); - /* If the dangling name entry for the orphan directory object has - * been remvoed, then just check whether the directory object is - * still under the .lustre/lost+found/MDTxxxx/ or not. */ - if (lpf) { - lpf = false; - goto again; - } +log: + lfsck_ibits_unlock(&lh, LCK_EX); + if (child != NULL && !IS_ERR(child)) + lfsck_object_put(env, child); - /* There is no linkEA entry that matches the ".." name entry. Find - * the first linkEA entry that both parent and name entry exist to - * rebuild a new ".." name entry. */ - if (once) { - once = false; - goto again; - } + CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired the object "DFID"'s " + "nlink count from %u to %u: rc = %d\n", + lfsck_lfsck2name(lfsck), PFID(cfid), old, *nlink, rc); - RETURN(rc); + if (rc != 0) + ns->ln_flags |= LF_INCONSISTENT; + + return rc; } /** @@ -1673,7 +2847,8 @@ static int lfsck_namespace_double_scan_dir(const struct lu_env *env, LASSERT(!dt_object_remote(child)); - if (!(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) { + if (flags & (LNTF_CHECK_LINKEA | LNTF_CHECK_PARENT) && + !(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) { CDEBUG(D_LFSCK, "%s: some MDT(s) maybe NOT take part in the" "the namespace LFSCK, then the LFSCK cannot guarantee" "all the name entries have been verified in first-stage" @@ -1757,6 +2932,21 @@ lock: * but no parent references this child * directory, then handle it as orphan. */ lfsck_ibits_unlock(&lh, LCK_EX); + type = LNIT_MUL_REF; + + /* If the LFSCK is marked as LF_INCOMPLETE, + * then means some MDT has ever tried to + * verify some remote MDT-object that resides + * on this MDT, but this MDT failed to respond + * such request. So means there may be some + * remote name entry on other MDT that + * references this object with another name, + * so we cannot know whether this linkEA is + * valid or not. So keep it there and maybe + * resolved when next LFSCK run. */ + if (ns->ln_flags & LF_INCOMPLETE) + GOTO(out, rc = 0); + snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf), "-"DFID, PFID(pfid)); rc = lfsck_namespace_insert_orphan(env, com, child, @@ -1785,7 +2975,8 @@ lock: goto lock; if (unlikely(ldata.ld_leh->leh_reccount == 0)) { - rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, &lh); + rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, &lh, + &type); GOTO(out, rc); } @@ -1815,6 +3006,9 @@ out: case LNIT_UNMATCHED_PAIRS: ns->ln_unmatched_pairs_repaired++; break; + case LNIT_MUL_REF: + ns->ln_mul_ref_repaired++; + break; default: break; } @@ -1921,6 +3115,8 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env, GOTO(out, rc = PTR_ERR(parent)); if (!dt_object_exists(parent)) { + +lost_parent: if (ldata.ld_leh->leh_reccount > 1) { /* If it is NOT the last linkEA entry, then * there is still other chance to make the @@ -1929,6 +3125,22 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env, rc = lfsck_namespace_shrink_linkea(env, com, child, &ldata, cname, pfid, true); } else { + /* If the LFSCK is marked as LF_INCOMPLETE, + * then means some MDT has ever tried to + * verify some remote MDT-object that resides + * on this MDT, but this MDT failed to respond + * such request. So means there may be some + * remote name entry on other MDT that + * references this object with another name, + * so we cannot know whether this linkEA is + * valid or not. So keep it there and maybe + * resolved when next LFSCK run. */ + if (ns->ln_flags & LF_INCOMPLETE) { + lfsck_object_put(env, parent); + + GOTO(out, rc = 0); + } + /* Create the lost parent as an orphan. */ rc = lfsck_namespace_create_orphan(env, com, parent); @@ -1944,7 +3156,20 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env, /* Add the missing name entry to the parent. */ rc = lfsck_namespace_insert_normal(env, com, parent, child, cname->ln_name); - linkea_next_entry(&ldata); + if (unlikely(rc == -EEXIST)) + /* Unfortunately, someone reused the + * name under the parent by race. So we + * have to remove the linkEA entry from + * current child object. It means that + * the LFSCK cannot recover the system + * totally back to its original status, + * but it is necessary to make the + * current system to be consistent. */ + rc = lfsck_namespace_shrink_linkea(env, + com, child, &ldata, + cname, pfid, true); + else + linkea_next_entry(&ldata); } lfsck_object_put(env, parent); @@ -2036,17 +3261,49 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env, continue; } + /* If the LFSCK is marked as LF_INCOMPLETE, then means some + * MDT has ever tried to verify some remote MDT-object that + * resides on this MDT, but this MDT failed to respond such + * request. So means there may be some remote name entry on + * other MDT that references this object with another name, + * so we cannot know whether this linkEA is valid or not. + * So keep it there and maybe resolved when next LFSCK run. */ + if (ns->ln_flags & LF_INCOMPLETE) { + lfsck_object_put(env, parent); + + GOTO(out, rc = 0); + } + /* Add the missing name entry back to the namespace. */ rc = lfsck_namespace_insert_normal(env, com, parent, child, cname->ln_name); + if (unlikely(rc == -ESTALE)) + /* It may happen when the remote object has been + * removed, but the local MDT is not aware of that. */ + goto lost_parent; + + if (unlikely(rc == -EEXIST)) + /* Unfortunately, someone reused the name under the + * parent by race. So we have to remove the linkEA + * entry from current child object. It means that the + * LFSCK cannot recover the system totally back to + * its original status, but it is necessary to make + * the current system to be consistent. + * + * It also may be because of the LFSCK found some + * internal status of create operation. Under such + * case, nothing to be done. */ + rc = lfsck_namespace_shrink_linkea_cond(env, com, + parent, child, &ldata, cname, pfid); + else + linkea_next_entry(&ldata); + lfsck_object_put(env, parent); if (rc < 0) GOTO(out, rc); if (rc > 0) repaired = true; - - linkea_next_entry(&ldata); } GOTO(out, rc = 0); @@ -2061,7 +3318,14 @@ out: count = ldata.ld_leh->leh_reccount; } - if (count == 0) { + /* If the LFSCK is marked as LF_INCOMPLETE, then means some + * MDT has ever tried to verify some remote MDT-object that + * resides on this MDT, but this MDT failed to respond such + * request. So means there may be some remote name entry on + * other MDT that references this object with another name, + * so we cannot know whether this linkEA is valid or not. + * So keep it there and maybe resolved when next LFSCK run. */ + if (count == 0 && !(ns->ln_flags & LF_INCOMPLETE)) { /* If the child becomes orphan, then insert it into * the global .lustre/lost+found/MDTxxxx directory. */ rc = lfsck_namespace_insert_orphan(env, com, child, "", "O", @@ -2069,8 +3333,10 @@ out: if (rc < 0) return rc; - if (rc > 0) + if (rc > 0) { + ns->ln_mul_ref_repaired++; repaired = true; + } } rc = dt_attr_get(env, child, la, BYPASS_CAPA); @@ -2078,8 +3344,12 @@ out: return rc; if (la->la_nlink != count) { - /* XXX: there will be other patch(es) for MDT-object - * hard links verification. */ + rc = lfsck_namespace_repair_nlink(env, com, child, + &la->la_nlink); + if (rc > 0) { + ns->ln_objs_nlink_repaired++; + rc = 0; + } } if (repaired) { @@ -2110,12 +3380,14 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m, "dirent_repaired: "LPU64"\n" "linkea_repaired: "LPU64"\n" "nlinks_repaired: "LPU64"\n" - "lost_found: "LPU64"\n" "multiple_linked_checked: "LPU64"\n" "multiple_linked_repaired: "LPU64"\n" "unknown_inconsistency: "LPU64"\n" "unmatched_pairs_repaired: "LPU64"\n" "dangling_repaired: "LPU64"\n" + "multiple_referenced_repaired: "LPU64"\n" + "bad_file_type_repaired: "LPU64"\n" + "lost_dirent_repaired: "LPU64"\n" "success_count: %u\n" "run_time_phase1: %u seconds\n" "run_time_phase2: %u seconds\n", @@ -2129,12 +3401,14 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m, ns->ln_dirent_repaired, ns->ln_linkea_repaired, ns->ln_objs_nlink_repaired, - ns->ln_objs_lost_found, ns->ln_mul_linked_checked, ns->ln_mul_linked_repaired, ns->ln_unknown_inconsistency, ns->ln_unmatched_pairs_repaired, ns->ln_dangling_repaired, + ns->ln_mul_ref_repaired, + ns->ln_bad_type_repaired, + ns->ln_lost_dirent_repaired, ns->ln_success_count, time_phase1, time_phase2); @@ -2145,11 +3419,12 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m, static int lfsck_namespace_reset(const struct lu_env *env, struct lfsck_component *com, bool init) { - struct lfsck_instance *lfsck = com->lc_lfsck; - struct lfsck_namespace *ns = com->lc_file_ram; - struct dt_object *root; - struct dt_object *dto; - int rc; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_assistant_data *lad = com->lc_data; + struct dt_object *root; + struct dt_object *dto; + int rc; ENTRY; root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid); @@ -2192,7 +3467,10 @@ static int lfsck_namespace_reset(const struct lu_env *env, if (rc != 0) GOTO(out, rc); - rc = lfsck_namespace_store(env, com, true); + lad->lad_incomplete = 0; + CFS_RESET_BITMAP(lad->lad_bitmap); + + rc = lfsck_namespace_store(env, com); GOTO(out, rc); @@ -2245,7 +3523,7 @@ static int lfsck_namespace_checkpoint(const struct lu_env *env, com->lc_new_checked = 0; } - rc = lfsck_namespace_store(env, com, false); + rc = lfsck_namespace_store(env, com); up_write(&com->lc_sem); log: @@ -2267,7 +3545,8 @@ static int lfsck_namespace_prep(const struct lu_env *env, struct lfsck_position *pos = &com->lc_pos_start; int rc; - if (ns->ln_status == LS_COMPLETED) { + rc = lfsck_namespace_load_bitmap(env, com); + if (rc != 0 || ns->ln_status == LS_COMPLETED) { rc = lfsck_namespace_reset(env, com, false); if (rc == 0) rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true); @@ -2305,7 +3584,6 @@ static int lfsck_namespace_prep(const struct lu_env *env, ns->ln_objs_repaired_phase2 = 0; ns->ln_objs_failed_phase2 = 0; ns->ln_objs_nlink_repaired = 0; - ns->ln_objs_lost_found = 0; ns->ln_dirent_repaired = 0; ns->ln_linkea_repaired = 0; ns->ln_mul_linked_checked = 0; @@ -2313,6 +3591,9 @@ static int lfsck_namespace_prep(const struct lu_env *env, ns->ln_unknown_inconsistency = 0; ns->ln_unmatched_pairs_repaired = 0; ns->ln_dangling_repaired = 0; + ns->ln_mul_ref_repaired = 0; + ns->ln_bad_type_repaired = 0; + ns->ln_lost_dirent_repaired = 0; fid_zero(&ns->ln_fid_latest_scanned_phase2); if (list_empty(&com->lc_link_dir)) list_add_tail(&com->lc_link_dir, @@ -2521,8 +3802,9 @@ static int lfsck_namespace_post(const struct lu_env *env, list_del_init(&com->lc_link_dir); list_move_tail(&com->lc_link, &lfsck->li_list_double_scan); } else if (result == 0) { - ns->ln_status = lfsck->li_status; - if (ns->ln_status == 0) + if (lfsck->li_status != 0) + ns->ln_status = lfsck->li_status; + else ns->ln_status = LS_STOPPED; if (ns->ln_status != LS_PAUSED) { list_del_init(&com->lc_link_dir); @@ -2543,7 +3825,7 @@ static int lfsck_namespace_post(const struct lu_env *env, com->lc_new_checked = 0; } - rc = lfsck_namespace_store(env, com, false); + rc = lfsck_namespace_store(env, com); up_write(&com->lc_sem); CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n", @@ -2721,9 +4003,27 @@ out: static int lfsck_namespace_double_scan(const struct lu_env *env, struct lfsck_component *com) { - struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_assistant_data *lad = com->lc_data; + struct lfsck_tgt_descs *ltds = &com->lc_lfsck->li_mdt_descs; + struct lfsck_tgt_desc *ltd; + struct lfsck_tgt_desc *next; + int rc; + + rc = lfsck_double_scan_generic(env, com, ns->ln_status); + if (thread_is_stopped(&lad->lad_thread)) { + LASSERT(list_empty(&lad->lad_req_list)); + LASSERT(list_empty(&lad->lad_mdt_phase1_list)); - return lfsck_double_scan_generic(env, com, ns->ln_status); + spin_lock(<ds->ltd_lock); + list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list, + ltd_namespace_phase_list) { + list_del_init(<d->ltd_namespace_phase_list); + } + spin_unlock(<ds->ltd_lock); + } + + return rc; } static void lfsck_namespace_data_release(const struct lu_env *env, @@ -2756,31 +4056,161 @@ static void lfsck_namespace_data_release(const struct lu_env *env, } spin_unlock(<ds->ltd_lock); - CFS_FREE_BITMAP(lad->lad_bitmap); + if (likely(lad->lad_bitmap != NULL)) + CFS_FREE_BITMAP(lad->lad_bitmap); OBD_FREE_PTR(lad); } +static void lfsck_namespace_quit(const struct lu_env *env, + struct lfsck_component *com) +{ + struct lfsck_assistant_data *lad = com->lc_data; + struct lfsck_tgt_descs *ltds = &com->lc_lfsck->li_mdt_descs; + struct lfsck_tgt_desc *ltd; + struct lfsck_tgt_desc *next; + + LASSERT(lad != NULL); + + lfsck_quit_generic(env, com); + + LASSERT(thread_is_init(&lad->lad_thread) || + thread_is_stopped(&lad->lad_thread)); + LASSERT(list_empty(&lad->lad_req_list)); + + spin_lock(<ds->ltd_lock); + list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list, + ltd_namespace_phase_list) { + list_del_init(<d->ltd_namespace_phase_list); + } + list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list, + ltd_namespace_phase_list) { + list_del_init(<d->ltd_namespace_phase_list); + } + spin_unlock(<ds->ltd_lock); +} + static int lfsck_namespace_in_notify(const struct lu_env *env, struct lfsck_component *com, - struct lfsck_request *lr) + struct lfsck_request *lr, + struct thandle *th) { struct lfsck_instance *lfsck = com->lc_lfsck; struct lfsck_namespace *ns = com->lc_file_ram; struct lfsck_assistant_data *lad = com->lc_data; struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs; struct lfsck_tgt_desc *ltd; + int rc; bool fail = false; ENTRY; - if (lr->lr_event != LE_PHASE1_DONE && - lr->lr_event != LE_PHASE2_DONE && - lr->lr_event != LE_PEER_EXIT) + switch (lr->lr_event) { + case LE_CREATE_ORPHAN: { + struct dt_object *orphan = NULL; + + CDEBUG(D_LFSCK, "%s: namespace LFSCK handling notify from " + "MDT %x to create orphan"DFID" with type %o\n", + lfsck_lfsck2name(lfsck), lr->lr_index, + PFID(&lr->lr_fid), lr->lr_type); + + orphan = lfsck_object_find(env, lfsck, &lr->lr_fid); + if (IS_ERR(orphan)) + GOTO(out_create, rc = PTR_ERR(orphan)); + + if (dt_object_exists(orphan)) + GOTO(out_create, rc = -EEXIST); + + rc = lfsck_namespace_create_orphan_local(env, com, orphan, + lr->lr_type); + + GOTO(out_create, rc = (rc == 1) ? 0 : rc); + +out_create: + CDEBUG(D_LFSCK, "%s: namespace LFSCK handled notify from " + "MDT %x to create orphan"DFID" with type %o: rc = %d\n", + lfsck_lfsck2name(lfsck), lr->lr_index, + PFID(&lr->lr_fid), lr->lr_type, rc); + + if (orphan != NULL && !IS_ERR(orphan)) + lfsck_object_put(env, orphan); + + return rc; + } + case LE_SKIP_NLINK_DECLARE: { + struct dt_object *obj = com->lc_obj; + struct lu_fid *key = &lfsck_env_info(env)->lti_fid3; + __u8 flags = 0; + + LASSERT(th != NULL); + + rc = dt_declare_delete(env, obj, + (const struct dt_key *)key, th); + if (rc == 0) + rc = dt_declare_insert(env, obj, + (const struct dt_rec *)&flags, + (const struct dt_key *)key, th); + + RETURN(rc); + } + case LE_SKIP_NLINK: { + struct dt_object *obj = com->lc_obj; + struct lu_fid *key = &lfsck_env_info(env)->lti_fid3; + __u8 flags = 0; + bool exist = false; + ENTRY; + + LASSERT(th != NULL); + + fid_cpu_to_be(key, &lr->lr_fid); + rc = dt_lookup(env, obj, (struct dt_rec *)&flags, + (const struct dt_key *)key, BYPASS_CAPA); + if (rc == 0) { + if (flags & LNTF_SKIP_NLINK) + RETURN(0); + + exist = true; + } else if (rc != -ENOENT) { + GOTO(log, rc); + } + + flags |= LNTF_SKIP_NLINK; + if (exist) { + rc = dt_delete(env, obj, (const struct dt_key *)key, + th, BYPASS_CAPA); + if (rc != 0) + GOTO(log, rc); + } + + rc = dt_insert(env, obj, (const struct dt_rec *)&flags, + (const struct dt_key *)key, th, BYPASS_CAPA, 1); + + GOTO(log, rc); + +log: + CDEBUG(D_LFSCK, "%s: RPC service thread mark the "DFID + " to be skipped for namespace double scan: rc = %d\n", + lfsck_lfsck2name(com->lc_lfsck), PFID(&lr->lr_fid), rc); + + if (rc != 0) + /* If we cannot record this object in the LFSCK tracing, + * we have to mark the LFSC as LF_INCOMPLETE, then the + * LFSCK will skip nlink attribute verification for + * all objects. */ + ns->ln_flags |= LF_INCOMPLETE; + + return 0; + } + case LE_PHASE1_DONE: + case LE_PHASE2_DONE: + case LE_PEER_EXIT: + break; + default: RETURN(-EINVAL); + } CDEBUG(D_LFSCK, "%s: namespace LFSCK handles notify %u from MDT %x, " - "status %d\n", lfsck_lfsck2name(lfsck), lr->lr_event, - lr->lr_index, lr->lr_status); + "status %d, flags %x\n", lfsck_lfsck2name(lfsck), lr->lr_event, + lr->lr_index, lr->lr_status, lr->lr_flags2); spin_lock(<ds->ltd_lock); ltd = LTD_TGT(ltds, lr->lr_index); @@ -2805,6 +4235,9 @@ static int lfsck_namespace_in_notify(const struct lu_env *env, break; } + if (lr->lr_flags2 & LF_INCOMPLETE) + ns->ln_flags |= LF_INCOMPLETE; + if (list_empty(<d->ltd_namespace_list)) list_add_tail(<d->ltd_namespace_list, &lad->lad_mdt_list); @@ -2864,7 +4297,7 @@ static struct lfsck_operations lfsck_namespace_ops = { .lfsck_dump = lfsck_namespace_dump, .lfsck_double_scan = lfsck_namespace_double_scan, .lfsck_data_release = lfsck_namespace_data_release, - .lfsck_quit = lfsck_quit_generic, + .lfsck_quit = lfsck_namespace_quit, .lfsck_in_notify = lfsck_namespace_in_notify, .lfsck_query = lfsck_namespace_query, }; @@ -2880,7 +4313,7 @@ static struct lfsck_operations lfsck_namespace_ops = { * and the users can make the decision about how to handle it with * more human knownledge. (by default) * - * 2) Re-create the missed MDT-object with the FID information. + * 2) Re-create the missing MDT-object with the FID information. * * \param[in] env pointer to the thread context * \param[in] com pointer to the lfsck component @@ -3089,14 +4522,14 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env, struct dt_object *dir = lnr->lnr_obj; struct dt_object *obj = NULL; const struct lu_fid *pfid = lfsck_dto2fid(dir); - struct dt_device *dev; + struct dt_device *dev = NULL; struct lustre_handle lh = { 0 }; bool repaired = false; bool dtlocked = false; bool remove; bool newdata; bool log = false; - int idx; + int idx = 0; int count = 0; int rc; enum lfsck_namespace_inconsistency_type type = LNIT_NONE; @@ -3153,7 +4586,7 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env, CDEBUG(D_LFSCK, "%s: cannot talk with MDT %x which " "did not join the namespace LFSCK\n", lfsck_lfsck2name(lfsck), idx); - ns->ln_flags |= LF_INCOMPLETE; + lfsck_lad_set_bitmap(env, com, idx); GOTO(out, rc = -ENODEV); } @@ -3223,16 +4656,34 @@ again: } /* It may happen when the remote object has been removed, - * but the local MDT does not aware of that. */ + * but the local MDT is not aware of that. */ goto dangling; } else if (rc == 0) { count = ldata.ld_leh->leh_reccount; rc = linkea_links_find(&ldata, cname, pfid); if ((rc == 0) && - (count == 1 || !S_ISDIR(lfsck_object_type(obj)))) + (count == 1 || !S_ISDIR(lfsck_object_type(obj)))) { + if ((lfsck_object_type(obj) & S_IFMT) != + lnr->lnr_type) { + ns->ln_flags |= LF_INCONSISTENT; + type = LNIT_BAD_TYPE; + } + goto record; + } ns->ln_flags |= LF_INCONSISTENT; + + /* If the file type stored in the name entry does not match + * the file type claimed by the object, and the object does + * not recognize the name entry, then it is quite possible + * that the name entry is corrupted. */ + if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type) { + type = LNIT_BAD_DIRENT; + + GOTO(stop, rc = 0); + } + /* For sub-dir object, we cannot make sure whether the sub-dir * back references the parent via ".." name entry correctly or * not in the LFSCK first-stage scanning. It may be that the @@ -3246,6 +4697,9 @@ again: newdata = false; goto nodata; } else if (unlikely(rc == -EINVAL)) { + if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type) + type = LNIT_BAD_TYPE; + count = 1; ns->ln_flags |= LF_INCONSISTENT; /* The magic crashed, we are not sure whether there are more @@ -3254,6 +4708,9 @@ again: newdata = true; goto nodata; } else if (rc == -ENODATA) { + if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type) + type = LNIT_BAD_TYPE; + count = 1; ns->ln_flags |= LF_UPGRADE; remove = false; @@ -3291,6 +4748,33 @@ nodata: GOTO(stop, rc); rc = lfsck_links_write(env, obj, &ldata, handle); + if (unlikely(rc == -ENOSPC) && + S_ISREG(lfsck_object_type(obj)) && !dt_object_remote(obj)) { + if (handle != NULL) { + LASSERT(dt_write_locked(env, obj)); + + dt_write_unlock(env, obj); + dtlocked = false; + + dt_trans_stop(env, dev, handle); + handle = NULL; + + lfsck_ibits_unlock(&lh, LCK_EX); + } + + rc = lfsck_namespace_trace_update(env, com, + &lnr->lnr_fid, LNTF_SKIP_NLINK, true); + if (rc != 0) + /* If we cannot record this object in the + * LFSCK tracing, we have to mark the LFSCK + * as LF_INCOMPLETE, then the LFSCK will + * skip nlink attribute verification for + * all objects. */ + ns->ln_flags |= LF_INCOMPLETE; + + GOTO(out, rc = 0); + } + if (rc != 0) GOTO(stop, rc); @@ -3343,6 +4827,34 @@ stop: out: lfsck_ibits_unlock(&lh, LCK_EX); + + if (rc >= 0) { + switch (type) { + case LNIT_BAD_TYPE: + log = false; + rc = lfsck_namespace_repair_dirent(env, com, dir, + obj, lnr->lnr_name, lnr->lnr_name, + lnr->lnr_type, true, false); + if (rc > 0) + repaired = true; + break; + case LNIT_BAD_DIRENT: + log = false; + /* XXX: This is a bad dirent, we do not know whether + * the original name entry reference a regular + * file or a directory, then keep the parent's + * nlink count unchanged here. */ + rc = lfsck_namespace_repair_dirent(env, com, dir, + obj, lnr->lnr_name, lnr->lnr_name, + lnr->lnr_type, false, false); + if (rc > 0) + repaired = true; + break; + default: + break; + } + } + down_write(&com->lc_sem); if (rc < 0) { CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle " @@ -3352,6 +4864,12 @@ out: lnr->lnr_namelen, lnr->lnr_name, rc); lfsck_namespace_record_failure(env, lfsck, ns); + if ((rc == -ENOTCONN || rc == -ESHUTDOWN || rc == -EREMCHG || + rc == -ETIMEDOUT || rc == -EHOSTDOWN || + rc == -EHOSTUNREACH || rc == -EINPROGRESS) && + dev != NULL && dev != lfsck->li_next) + lfsck_lad_set_bitmap(env, com, idx); + if (!(bk->lb_param & LPF_FAILOUT)) rc = 0; } else { @@ -3370,6 +4888,12 @@ out: case LNIT_DANGLING: ns->ln_dangling_repaired++; break; + case LNIT_BAD_TYPE: + ns->ln_bad_type_repaired++; + break; + case LNIT_BAD_DIRENT: + ns->ln_dirent_repaired++; + break; default: break; } @@ -3380,12 +4904,14 @@ out: &ns->ln_pos_first_inconsistent, false); } + rc = 0; } up_write(&com->lc_sem); if (obj != NULL && !IS_ERR(obj)) lfsck_object_put(env, obj); + return rc; } @@ -3493,7 +5019,7 @@ checkpoint: ns->ln_time_last_checkpoint = cfs_time_current_sec(); ns->ln_objs_checked_phase2 += com->lc_new_checked; com->lc_new_checked = 0; - rc = lfsck_namespace_store(env, com, false); + rc = lfsck_namespace_store(env, com); up_write(&com->lc_sem); if (rc != 0) GOTO(put, rc); @@ -3567,24 +5093,116 @@ static int lfsck_namespace_double_scan_result(const struct lu_env *env, ns->ln_time_last_complete = ns->ln_time_last_checkpoint; ns->ln_success_count++; } else if (rc == 0) { - ns->ln_status = lfsck->li_status; - if (ns->ln_status == 0) + if (lfsck->li_status != 0) + ns->ln_status = lfsck->li_status; + else ns->ln_status = LS_STOPPED; } else { ns->ln_status = LS_FAILED; } - rc = lfsck_namespace_store(env, com, false); + rc = lfsck_namespace_store(env, com); up_write(&com->lc_sem); return rc; } +static int +lfsck_namespace_assistant_sync_failures_interpret(const struct lu_env *env, + struct ptlrpc_request *req, + void *args, int rc) +{ + return 0; +} + +/** + * Notify remote LFSCK instances about former failures. + * + * The local LFSCK instance has recorded which MDTs have ever failed to respond + * some LFSCK verification requests (maybe because of network issues or the MDT + * itself trouble). During the respond gap the MDT may missed some name entries + * verification, then the MDT cannot know whether related MDT-objects have been + * referenced by related name entries or not, then in the second-stage scanning, + * these MDT-objects will be regarded as orphan, if the MDT-object contains bad + * linkEA for back reference, then it will misguide the LFSCK to generate wrong + * name entry for repairing the orphan. + * + * To avoid above trouble, when layout LFSCK finishes the first-stage scanning, + * it will scan the bitmap for the ever failed MDTs, and notify them that they + * have ever missed some name entries verification and should skip the handling + * for orphan MDT-objects. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * \param[in] lr pointer to the lfsck request + */ static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env, struct lfsck_component *com, struct lfsck_request *lr) { - /* XXX: TBD */ + struct lfsck_async_interpret_args *laia = + &lfsck_env_info(env)->lti_laia2; + struct lfsck_assistant_data *lad = com->lc_data; + struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs; + struct lfsck_tgt_desc *ltd; + struct ptlrpc_request_set *set; + int rc = 0; + ENTRY; + + set = ptlrpc_prep_set(); + if (set == NULL) + GOTO(out, rc = -ENOMEM); + + lr->lr_flags2 = ns->ln_flags | LF_INCOMPLETE; + memset(laia, 0, sizeof(*laia)); + lad->lad_touch_gen++; + + spin_lock(<ds->ltd_lock); + while (!list_empty(&lad->lad_mdt_list)) { + ltd = list_entry(lad->lad_mdt_list.next, + struct lfsck_tgt_desc, + ltd_namespace_list); + if (ltd->ltd_namespace_gen == lad->lad_touch_gen) + break; + + ltd->ltd_namespace_gen = lad->lad_touch_gen; + list_move_tail(<d->ltd_namespace_list, + &lad->lad_mdt_list); + if (!lad->lad_incomplete || + !cfs_bitmap_check(lad->lad_bitmap, ltd->ltd_index)) { + ltd->ltd_namespace_failed = 0; + continue; + } + + ltd->ltd_namespace_failed = 1; + spin_unlock(<ds->ltd_lock); + rc = lfsck_async_request(env, ltd->ltd_exp, lr, set, + lfsck_namespace_assistant_sync_failures_interpret, + laia, LFSCK_NOTIFY); + if (rc != 0) + CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail " + "to sync failure with MDT %x: rc = %d\n", + lfsck_lfsck2name(lfsck), ltd->ltd_index, rc); + + spin_lock(<ds->ltd_lock); + } + spin_unlock(<ds->ltd_lock); + + rc = ptlrpc_set_wait(set); + ptlrpc_set_destroy(set); + + GOTO(out, rc); + +out: + if (rc != 0) + CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail " + "to sync failure with MDTs, and related MDTs " + "may handle orphan improperly: rc = %d\n", + lfsck_lfsck2name(lfsck), rc); + + EXIT; } struct lfsck_assistant_operations lfsck_namespace_assistant_ops = {