X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flfsck%2Flfsck_namespace.c;h=f91bdcd501db95ea3589d29b26acfa4368a39d19;hb=846dd0cb79fc309703afdaae7622e4ddeb0e2e49;hp=8a688c79e07c3eee59ad8782ea71061ef240b0b7;hpb=7093fb094f0cf7ea1a5c9a21b198f41f2558a9b8;p=fs%2Flustre-release.git diff --git a/lustre/lfsck/lfsck_namespace.c b/lustre/lfsck/lfsck_namespace.c index 8a688c7..f91bdcd 100644 --- a/lustre/lfsck/lfsck_namespace.c +++ b/lustre/lfsck/lfsck_namespace.c @@ -116,7 +116,6 @@ static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst, le64_to_cpu(src->ln_objs_repaired_phase2); dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2); dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired); - dst->ln_objs_lost_found = le64_to_cpu(src->ln_objs_lost_found); fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2, &src->ln_fid_latest_scanned_phase2); dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired); @@ -129,6 +128,10 @@ static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst, le64_to_cpu(src->ln_unmatched_pairs_repaired); dst->ln_dangling_repaired = le64_to_cpu(src->ln_dangling_repaired); dst->ln_mul_ref_repaired = le64_to_cpu(src->ln_mul_ref_repaired); + dst->ln_bad_type_repaired = le64_to_cpu(src->ln_bad_type_repaired); + dst->ln_lost_dirent_repaired = + le64_to_cpu(src->ln_lost_dirent_repaired); + dst->ln_bitmap_size = le32_to_cpu(src->ln_bitmap_size); } static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst, @@ -159,7 +162,6 @@ static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst, cpu_to_le64(src->ln_objs_repaired_phase2); dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2); dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired); - dst->ln_objs_lost_found = cpu_to_le64(src->ln_objs_lost_found); fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2, &src->ln_fid_latest_scanned_phase2); dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired); @@ -172,6 +174,10 @@ static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst, cpu_to_le64(src->ln_unmatched_pairs_repaired); dst->ln_dangling_repaired = cpu_to_le64(src->ln_dangling_repaired); dst->ln_mul_ref_repaired = cpu_to_le64(src->ln_mul_ref_repaired); + dst->ln_bad_type_repaired = cpu_to_le64(src->ln_bad_type_repaired); + dst->ln_lost_dirent_repaired = + cpu_to_le64(src->ln_lost_dirent_repaired); + dst->ln_bitmap_size = cpu_to_le32(src->ln_bitmap_size); } static void lfsck_namespace_record_failure(const struct lu_env *env, @@ -196,6 +202,81 @@ static void lfsck_namespace_record_failure(const struct lu_env *env, } /** + * Load the MDT bitmap from the lfsck_namespace tracing file. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * + * \retval positive number for data corruption + * \retval 0 for success + * \retval negative error number on failure + */ +static int lfsck_namespace_load_bitmap(const struct lu_env *env, + struct lfsck_component *com) +{ + struct dt_object *obj = com->lc_obj; + struct lfsck_assistant_data *lad = com->lc_data; + struct lfsck_namespace *ns = com->lc_file_ram; + cfs_bitmap_t *bitmap = lad->lad_bitmap; + ssize_t size; + __u32 nbits; + int rc; + ENTRY; + + if (com->lc_lfsck->li_mdt_descs.ltd_tgts_bitmap->size > + ns->ln_bitmap_size) + nbits = com->lc_lfsck->li_mdt_descs.ltd_tgts_bitmap->size; + else + nbits = ns->ln_bitmap_size; + + if (unlikely(nbits < BITS_PER_LONG)) + nbits = BITS_PER_LONG; + + if (nbits > bitmap->size) { + __u32 new_bits = bitmap->size; + cfs_bitmap_t *new_bitmap; + + while (new_bits < nbits) + new_bits <<= 1; + + new_bitmap = CFS_ALLOCATE_BITMAP(new_bits); + if (new_bitmap == NULL) + RETURN(-ENOMEM); + + lad->lad_bitmap = new_bitmap; + CFS_FREE_BITMAP(bitmap); + bitmap = new_bitmap; + } + + if (ns->ln_bitmap_size == 0) { + lad->lad_incomplete = 0; + CFS_RESET_BITMAP(bitmap); + + RETURN(0); + } + + size = (ns->ln_bitmap_size + 7) >> 3; + rc = dt_xattr_get(env, obj, + lfsck_buf_get(env, bitmap->data, size), + XATTR_NAME_LFSCK_BITMAP, BYPASS_CAPA); + if (rc == -ERANGE || rc == -ENODATA || rc == 0) + RETURN(1); + + if (rc < 0) + RETURN(rc); + + if (rc != size) + RETURN(rc); + + if (cfs_bitmap_check_empty(bitmap)) + lad->lad_incomplete = 0; + else + lad->lad_incomplete = 1; + + RETURN(0); +} + +/** * \retval +ve: the lfsck_namespace is broken, the caller should reset it. * \retval 0: succeed. * \retval -ve: failed cases. @@ -233,17 +314,30 @@ static int lfsck_namespace_load(const struct lu_env *env, } static int lfsck_namespace_store(const struct lu_env *env, - struct lfsck_component *com, bool init) + struct lfsck_component *com) { - struct dt_object *obj = com->lc_obj; - struct lfsck_instance *lfsck = com->lc_lfsck; - struct thandle *handle; - int len = com->lc_file_size; - int rc; + struct dt_object *obj = com->lc_obj; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_assistant_data *lad = com->lc_data; + cfs_bitmap_t *bitmap = NULL; + struct thandle *handle; + __u32 nbits = 0; + int len = com->lc_file_size; + int rc; ENTRY; + if (lad != NULL) { + bitmap = lad->lad_bitmap; + nbits = bitmap->size; + + LASSERT(nbits > 0); + LASSERTF((nbits & 7) == 0, "Invalid nbits %u\n", nbits); + } + + ns->ln_bitmap_size = nbits; lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk, - (struct lfsck_namespace *)com->lc_file_ram); + ns); handle = dt_trans_create(env, lfsck->li_bottom); if (IS_ERR(handle)) GOTO(log, rc = PTR_ERR(handle)); @@ -254,15 +348,26 @@ static int lfsck_namespace_store(const struct lu_env *env, if (rc != 0) GOTO(out, rc); + if (bitmap != NULL) { + rc = dt_declare_xattr_set(env, obj, + lfsck_buf_get(env, bitmap->data, nbits >> 3), + XATTR_NAME_LFSCK_BITMAP, 0, handle); + if (rc != 0) + GOTO(out, rc); + } + rc = dt_trans_start_local(env, lfsck->li_bottom, handle); if (rc != 0) GOTO(out, rc); rc = dt_xattr_set(env, obj, lfsck_buf_get(env, com->lc_file_disk, len), - XATTR_NAME_LFSCK_NAMESPACE, - init ? LU_XATTR_CREATE : LU_XATTR_REPLACE, - handle, BYPASS_CAPA); + XATTR_NAME_LFSCK_NAMESPACE, 0, handle, BYPASS_CAPA); + if (rc == 0 && bitmap != NULL) + rc = dt_xattr_set(env, obj, + lfsck_buf_get(env, bitmap->data, nbits >> 3), + XATTR_NAME_LFSCK_BITMAP, 0, handle, + BYPASS_CAPA); GOTO(out, rc); @@ -286,7 +391,7 @@ static int lfsck_namespace_init(const struct lu_env *env, ns->ln_magic = LFSCK_NAMESPACE_MAGIC; ns->ln_status = LS_INIT; down_write(&com->lc_sem); - rc = lfsck_namespace_store(env, com, true); + rc = lfsck_namespace_store(env, com); up_write(&com->lc_sem); return rc; } @@ -822,22 +927,473 @@ log: return rc; } +/** + * Add the specified name entry back to namespace. + * + * If there is a linkEA entry that back references a name entry under + * some parent directory, but such parent directory does not have the + * claimed name entry. On the other hand, the linkEA entries count is + * not larger than the MDT-object's hard link count. Under such case, + * it is quite possible that the name entry is lost. Then the LFSCK + * should add the name entry back to the namespace. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * \param[in] parent pointer to the directory under which the name entry + * will be inserted into + * \param[in] child pointer to the object referenced by the name entry + * that to be inserted into the parent + * \param[in] name the name for the child in the parent directory + * + * \retval positive number for repaired cases + * \retval 0 if nothing to be repaired + * \retval negative error number on failure + */ static int lfsck_namespace_insert_normal(const struct lu_env *env, struct lfsck_component *com, struct dt_object *parent, struct dt_object *child, const char *name) { - /* XXX: TBD */ - return 0; + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lu_attr *la = &info->lti_la; + struct dt_insert_rec *rec = &info->lti_dt_rec; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct dt_device *dev = lfsck->li_next; + struct thandle *th = NULL; + struct lustre_handle lh = { 0 }; + int rc = 0; + ENTRY; + + if (unlikely(!dt_try_as_dir(env, parent))) + GOTO(log, rc = -ENOTDIR); + + if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) + GOTO(log, rc = 1); + + /* Hold update lock on the parent to prevent others to access. */ + rc = lfsck_ibits_lock(env, lfsck, parent, &lh, + MDS_INODELOCK_UPDATE, LCK_EX); + if (rc != 0) + GOTO(log, rc); + + th = dt_trans_create(env, dev); + if (IS_ERR(th)) + GOTO(unlock, rc = PTR_ERR(th)); + + rec->rec_type = lfsck_object_type(child) & S_IFMT; + rec->rec_fid = lfsck_dto2fid(child); + rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec, + (const struct dt_key *)name, th); + if (rc != 0) + GOTO(stop, rc); + + if (S_ISDIR(rec->rec_type)) { + rc = dt_declare_ref_add(env, parent, th); + if (rc != 0) + GOTO(stop, rc); + } + + memset(la, 0, sizeof(*la)); + la->la_ctime = cfs_time_current_sec(); + la->la_valid = LA_CTIME; + rc = dt_declare_attr_set(env, parent, la, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, dev, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_insert(env, parent, (const struct dt_rec *)rec, + (const struct dt_key *)name, th, BYPASS_CAPA, 1); + if (rc != 0) + GOTO(stop, rc); + + if (S_ISDIR(rec->rec_type)) { + dt_write_lock(env, parent, 0); + rc = dt_ref_add(env, parent, th); + dt_write_unlock(env, parent); + if (rc != 0) + GOTO(stop, rc); + } + + la->la_ctime = cfs_time_current_sec(); + rc = dt_attr_set(env, parent, la, th, BYPASS_CAPA); + + GOTO(stop, rc = (rc == 0 ? 1 : rc)); + +stop: + dt_trans_stop(env, dev, th); + +unlock: + lfsck_ibits_unlock(&lh, LCK_EX); + +log: + CDEBUG(D_LFSCK, "%s: namespace LFSCK insert object "DFID" with " + "the name %s and type %o to the parent "DFID": rc = %d\n", + lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), name, + lfsck_object_type(child) & S_IFMT, + PFID(lfsck_dto2fid(parent)), rc); + + if (rc != 0) { + struct lfsck_namespace *ns = com->lc_file_ram; + + ns->ln_flags |= LF_INCONSISTENT; + if (rc > 0) + ns->ln_lost_dirent_repaired++; + } + + return rc; +} + +/** + * Create the specified orphan MDT-object on remote MDT. + * + * The LFSCK instance on this MDT will send LFSCK RPC to remote MDT to + * ask the remote LFSCK instance to create the specified orphan object + * under .lustre/lost+found/MDTxxxx/ directory with the name: + * ${FID}-P-${conflict_version}. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * \param[in] orphan pointer to the orphan MDT-object + * \param[in] type the orphan's type to be created + * + * type "P": The orphan object to be created was a parent directory + * of some DMT-object which linkEA shows that the @orphan + * object is missing. + * + * \see lfsck_layout_recreate_parent() for more types. + * + * \retval positive number for repaired cases + * \retval 0 if needs to repair nothing + * \retval negative error number on failure + */ +static int lfsck_namespace_create_orphan_remote(const struct lu_env *env, + struct lfsck_component *com, + struct dt_object *orphan, + __u32 type) +{ + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lfsck_request *lr = &info->lti_lr; + struct lu_seq_range *range = &info->lti_range; + const struct lu_fid *fid = lfsck_dto2fid(orphan); + struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct seq_server_site *ss = + lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site); + struct lfsck_tgt_desc *ltd = NULL; + struct ptlrpc_request *req = NULL; + int rc; + ENTRY; + + if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) + GOTO(out, rc = 1); + + fld_range_set_mdt(range); + rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range); + if (rc != 0) + GOTO(out, rc); + + ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, range->lsr_index); + if (ltd == NULL) { + ns->ln_flags |= LF_INCOMPLETE; + + GOTO(out, rc = -ENODEV); + } + + req = ptlrpc_request_alloc(class_exp2cliimp(ltd->ltd_exp), + &RQF_LFSCK_NOTIFY); + if (req == NULL) + GOTO(out, rc = -ENOMEM); + + rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY); + if (rc != 0) { + ptlrpc_request_free(req); + + GOTO(out, rc); + } + + lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST); + memset(lr, 0, sizeof(*lr)); + lr->lr_event = LE_CREATE_ORPHAN; + lr->lr_index = lfsck_dev_idx(lfsck->li_bottom); + lr->lr_active = LFSCK_TYPE_NAMESPACE; + lr->lr_fid = *fid; + lr->lr_type = type; + + ptlrpc_request_set_replen(req); + rc = ptlrpc_queue_wait(req); + ptlrpc_req_finished(req); + + if (rc == 0) + rc = 1; + else if (rc == -EEXIST) + rc = 0; + + GOTO(out, rc); + +out: + CDEBUG(D_LFSCK, "%s: namespace LFSCK create object " + DFID" on the MDT %x remotely: rc = %d\n", + lfsck_lfsck2name(lfsck), PFID(fid), + ltd != NULL ? ltd->ltd_index : -1, rc); + + if (ltd != NULL) + lfsck_tgt_put(ltd); + + return rc; } +/** + * Create the specified orphan MDT-object locally. + * + * For the case that the parent MDT-object stored in some MDT-object's + * linkEA entry is lost, the LFSCK will re-create the parent object as + * an orphan and insert it into .lustre/lost+found/MDTxxxx/ directory + * with the name ${FID}-P-${conflict_version}. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * \param[in] orphan pointer to the orphan MDT-object to be created + * \param[in] type the orphan's type to be created + * + * type "P": The orphan object to be created was a parent directory + * of some DMT-object which linkEA shows that the @orphan + * object is missing. + * + * \see lfsck_layout_recreate_parent() for more types. + * + * \retval positive number for repaired cases + * \retval negative error number on failure + */ +static int lfsck_namespace_create_orphan_local(const struct lu_env *env, + struct lfsck_component *com, + struct dt_object *orphan, + __u32 type) +{ + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lu_attr *la = &info->lti_la; + struct dt_allocation_hint *hint = &info->lti_hint; + struct dt_object_format *dof = &info->lti_dof; + struct lu_name *cname = &info->lti_name2; + struct dt_insert_rec *rec = &info->lti_dt_rec; + struct lu_fid *tfid = &info->lti_fid; + const struct lu_fid *cfid = lfsck_dto2fid(orphan); + const struct lu_fid *pfid; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct dt_device *dev = lfsck->li_bottom; + struct dt_object *parent = NULL; + struct dt_object *child = NULL; + struct thandle *th = NULL; + struct lustre_handle lh = { 0 }; + struct linkea_data ldata = { 0 }; + struct lu_buf linkea_buf; + char name[32]; + int namelen; + int idx = 0; + int rc = 0; + ENTRY; + + LASSERT(!dt_object_exists(orphan)); + LASSERT(!dt_object_remote(orphan)); + + /* @orphan maybe not attached to lfsck->li_bottom */ + child = lfsck_object_find_by_dev(env, dev, cfid); + if (IS_ERR(child)) + GOTO(log, rc = PTR_ERR(child)); + + cname->ln_name = NULL; + if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) + GOTO(log, rc = 1); + + /* Create .lustre/lost+found/MDTxxxx when needed. */ + if (unlikely(lfsck->li_lpf_obj == NULL)) { + rc = lfsck_create_lpf(env, lfsck); + if (rc != 0) + GOTO(log, rc); + } + + parent = lfsck->li_lpf_obj; + pfid = lfsck_dto2fid(parent); + + /* Hold update lock on the parent to prevent others to access. */ + rc = lfsck_ibits_lock(env, lfsck, parent, &lh, + MDS_INODELOCK_UPDATE, LCK_EX); + if (rc != 0) + GOTO(log, rc); + + do { + namelen = snprintf(name, 31, DFID"-P-%d", + PFID(cfid), idx++); + rc = dt_lookup(env, parent, (struct dt_rec *)tfid, + (const struct dt_key *)name, BYPASS_CAPA); + if (rc != 0 && rc != -ENOENT) + GOTO(unlock1, rc); + } while (rc == 0); + + cname->ln_name = name; + cname->ln_namelen = namelen; + + memset(la, 0, sizeof(*la)); + la->la_mode = type | (S_ISDIR(type) ? 0700 : 0600); + la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID | + LA_ATIME | LA_MTIME | LA_CTIME; + + child->do_ops->do_ah_init(env, hint, parent, child, + la->la_mode & S_IFMT); + + memset(dof, 0, sizeof(*dof)); + dof->dof_type = dt_mode_to_dft(type); + + rc = linkea_data_new(&ldata, &info->lti_linkea_buf2); + if (rc != 0) + GOTO(unlock1, rc); + + rc = linkea_add_buf(&ldata, cname, pfid); + if (rc != 0) + GOTO(unlock1, rc); + + th = dt_trans_create(env, dev); + if (IS_ERR(th)) + GOTO(unlock1, rc = PTR_ERR(th)); + + rc = dt_declare_create(env, child, la, hint, dof, th); + if (rc == 0 && S_ISDIR(type)) + rc = dt_declare_ref_add(env, child, th); + + if (rc != 0) + GOTO(stop, rc); + + lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf, + ldata.ld_leh->leh_len); + rc = dt_declare_xattr_set(env, child, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc != 0) + GOTO(stop, rc); + + rec->rec_type = type; + rec->rec_fid = cfid; + rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec, + (const struct dt_key *)name, th); + if (rc == 0 && S_ISDIR(type)) + rc = dt_declare_ref_add(env, parent, th); + + if (rc != 0) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, dev, th); + if (rc != 0) + GOTO(stop, rc); + + dt_write_lock(env, child, 0); + rc = dt_create(env, child, la, hint, dof, th); + if (rc != 0) + GOTO(unlock2, rc); + + if (S_ISDIR(type)) { + if (unlikely(!dt_try_as_dir(env, child))) + GOTO(unlock2, rc = -ENOTDIR); + + rec->rec_type = S_IFDIR; + rec->rec_fid = cfid; + rc = dt_insert(env, child, (const struct dt_rec *)rec, + (const struct dt_key *)dot, th, BYPASS_CAPA, 1); + if (rc != 0) + GOTO(unlock2, rc); + + rec->rec_fid = pfid; + rc = dt_insert(env, child, (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, th, + BYPASS_CAPA, 1); + if (rc != 0) + GOTO(unlock2, rc); + + rc = dt_ref_add(env, child, th); + if (rc != 0) + GOTO(unlock2, rc); + } + + rc = dt_xattr_set(env, child, &linkea_buf, + XATTR_NAME_LINK, 0, th, BYPASS_CAPA); + dt_write_unlock(env, child); + if (rc != 0) + GOTO(stop, rc); + + rec->rec_type = type; + rec->rec_fid = cfid; + rc = dt_insert(env, parent, (const struct dt_rec *)rec, + (const struct dt_key *)name, th, BYPASS_CAPA, 1); + if (rc == 0 && S_ISDIR(type)) { + dt_write_lock(env, parent, 0); + rc = dt_ref_add(env, parent, th); + dt_write_unlock(env, parent); + } + + GOTO(stop, rc = (rc == 0 ? 1 : rc)); + +unlock2: + dt_write_unlock(env, child); + +stop: + dt_trans_stop(env, dev, th); + +unlock1: + lfsck_ibits_unlock(&lh, LCK_EX); + +log: + CDEBUG(D_LFSCK, "%s: namespace LFSCK create orphan locally for " + "the object "DFID", name = %s, type %o: rc = %d\n", + lfsck_lfsck2name(lfsck), PFID(cfid), + cname->ln_name != NULL ? cname->ln_name : "", type, rc); + + if (child != NULL && !IS_ERR(child)) + lfsck_object_put(env, child); + + return rc; +} + +/** + * Create the specified orphan MDT-object. + * + * For the case that the parent MDT-object stored in some MDT-object's + * linkEA entry is lost, the LFSCK will re-create the parent object as + * an orphan and insert it into .lustre/lost+found/MDTxxxx/ directory + * with the name: ${FID}-P-${conflict_version}. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * \param[in] orphan pointer to the orphan MDT-object + * + * type "P": The orphan object to be created was a parent directory + * of some DMT-object which linkEA shows that the @orphan + * object is missing. + * + * \see lfsck_layout_recreate_parent() for more types. + * + * \retval positive number for repaired cases + * \retval 0 if needs to repair nothing + * \retval negative error number on failure + */ static int lfsck_namespace_create_orphan(const struct lu_env *env, struct lfsck_component *com, struct dt_object *orphan) { - /* XXX: TBD */ - return 0; + struct lfsck_namespace *ns = com->lc_file_ram; + int rc; + + if (dt_object_remote(orphan)) + rc = lfsck_namespace_create_orphan_remote(env, com, orphan, + S_IFDIR); + else + rc = lfsck_namespace_create_orphan_local(env, com, orphan, + S_IFDIR); + + if (rc != 0) + ns->ln_flags |= LF_INCONSISTENT; + + return rc; } /** @@ -1322,6 +1878,155 @@ log: } /** + * Repair invalid name entry. + * + * If the name entry contains invalid information, such as bad file type + * or (and) corrupted object FID, then either remove the name entry or + * udpate the name entry with the given (right) information. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * \param[in] parent pointer to the parent directory + * \param[in] child pointer to the object referenced by the name entry + * \param[in] name the old name of the child under the parent directory + * \param[in] name2 the new name of the child under the parent directory + * \param[in] type the type claimed by the name entry + * \param[in] update update the name entry if true; otherwise, remove it + * \param[in] dec decrease the parent nlink count if true + * + * \retval positive number for repaired successfully + * \retval 0 if nothing to be repaired + * \retval negative error number on failure + */ +int lfsck_namespace_repair_dirent(const struct lu_env *env, + struct lfsck_component *com, + struct dt_object *parent, + struct dt_object *child, + const char *name, const char *name2, + __u16 type, bool update, bool dec) +{ + struct lfsck_thread_info *info = lfsck_env_info(env); + struct dt_insert_rec *rec = &info->lti_dt_rec; + const struct lu_fid *cfid = lfsck_dto2fid(child); + struct lu_fid *tfid = &info->lti_fid5; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct dt_device *dev = lfsck->li_next; + struct thandle *th = NULL; + struct lustre_handle lh = { 0 }; + int rc = 0; + ENTRY; + + if (unlikely(!dt_try_as_dir(env, parent))) + GOTO(log, rc = -ENOTDIR); + + rc = lfsck_ibits_lock(env, lfsck, parent, &lh, + MDS_INODELOCK_UPDATE, LCK_EX); + if (rc != 0) + GOTO(log, rc); + + th = dt_trans_create(env, dev); + if (IS_ERR(th)) + GOTO(unlock1, rc = PTR_ERR(th)); + + rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th); + if (rc != 0) + GOTO(stop, rc); + + if (update) { + rec->rec_type = lfsck_object_type(child) & S_IFMT; + rec->rec_fid = cfid; + rc = dt_declare_insert(env, parent, + (const struct dt_rec *)rec, + (const struct dt_key *)name2, th); + if (rc != 0) + GOTO(stop, rc); + } + + if (dec) { + rc = dt_declare_ref_del(env, parent, th); + if (rc != 0) + GOTO(stop, rc); + } + + rc = dt_trans_start(env, dev, th); + if (rc != 0) + GOTO(stop, rc); + + dt_write_lock(env, parent, 0); + rc = dt_lookup(env, parent, (struct dt_rec *)tfid, + (const struct dt_key *)name, BYPASS_CAPA); + /* Someone has removed the bad name entry by race. */ + if (rc == -ENOENT) + GOTO(unlock2, rc = 0); + + if (rc != 0) + GOTO(unlock2, rc); + + /* Someone has removed the bad name entry and reused it for other + * object by race. */ + if (!lu_fid_eq(tfid, cfid)) + GOTO(unlock2, rc = 0); + + if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) + GOTO(unlock2, rc = 1); + + rc = dt_delete(env, parent, (const struct dt_key *)name, th, + BYPASS_CAPA); + if (rc != 0) + GOTO(unlock2, rc); + + if (update) { + rc = dt_insert(env, parent, + (const struct dt_rec *)rec, + (const struct dt_key *)name2, th, + BYPASS_CAPA, 1); + if (rc != 0) + GOTO(unlock2, rc); + } + + if (dec) { + rc = dt_ref_del(env, parent, th); + if (rc != 0) + GOTO(unlock2, rc); + } + + GOTO(unlock2, rc = (rc == 0 ? 1 : rc)); + +unlock2: + dt_write_unlock(env, parent); + +stop: + dt_trans_stop(env, dev, th); + + /* We are not sure whether the child will become orphan or not. + * Record it in the LFSCK tracing file for further checking in + * the second-stage scanning. */ + if (!update && !dec && rc == 0) + lfsck_namespace_trace_update(env, com, cfid, + LNTF_CHECK_LINKEA, true); + +unlock1: + lfsck_ibits_unlock(&lh, LCK_EX); + +log: + CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found bad name " + "entry for: parent "DFID", child "DFID", name %s, type " + "in name entry %o, type claimed by child %o. repair it " + "by %s with new name2 %s: rc = %d\n", lfsck_lfsck2name(lfsck), + PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)), + name, type, update ? lfsck_object_type(child) : 0, + update ? "updating" : "removing", name2, rc); + + if (rc != 0) { + struct lfsck_namespace *ns = com->lc_file_ram; + + ns->ln_flags |= LF_INCONSISTENT; + } + + return rc; +} + +/** * Update the ".." name entry for the given object. * * The object's ".." is corrupted, this function will update the ".." name @@ -1464,6 +2169,7 @@ lfsck_namespace_dsd_orphan(const struct lu_env *env, enum lfsck_namespace_inconsistency_type *type) { struct lfsck_thread_info *info = lfsck_env_info(env); + struct lfsck_namespace *ns = com->lc_file_ram; int rc; ENTRY; @@ -1474,6 +2180,17 @@ lfsck_namespace_dsd_orphan(const struct lu_env *env, RETURN(rc); *type = LNIT_MUL_REF; + + /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has + * ever tried to verify some remote MDT-object that resides on this + * MDT, but this MDT failed to respond such request. So means there + * may be some remote name entry on other MDT that references this + * object with another name, so we cannot know whether this linkEA + * is valid or not. So keep it there and maybe resolved when next + * LFSCK run. */ + if (ns->ln_flags & LF_INCOMPLETE) + RETURN(0); + /* The unique linkEA is invalid, even if the ".." name entry may be * valid, we still cannot know via which name entry this directory * will be referenced. Then handle it as pure orphan. */ @@ -1522,6 +2239,7 @@ lfsck_namespace_dsd_single(const struct lu_env *env, struct lu_name *cname = &info->lti_name; const struct lu_fid *cfid = lfsck_dto2fid(child); struct lu_fid *tfid = &info->lti_fid3; + struct lfsck_namespace *ns = com->lc_file_ram; struct lfsck_instance *lfsck = com->lc_lfsck; struct dt_object *parent = NULL; int rc = 0; @@ -1549,6 +2267,16 @@ lfsck_namespace_dsd_single(const struct lu_env *env, * name entry the child will be referenced, since all known entries * have been verified during the first-stage scanning. */ if (!dt_object_exists(parent)) { + /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT + * has ever tried to verify some remote MDT-object that resides + * on this MDT, but this MDT failed to respond such request. So + * means there may be some remote name entry on other MDT that + * references this object with another name, so we cannot know + * whether this linkEA is valid or not. So keep it there and + * maybe resolved when next LFSCK run. */ + if (ns->ln_flags & LF_INCOMPLETE) + GOTO(out, rc = 0); + if (!lustre_handle_is_used(lh) && retry != NULL) { *retry = true; @@ -1556,12 +2284,36 @@ lfsck_namespace_dsd_single(const struct lu_env *env, } lfsck_ibits_unlock(lh, LCK_EX); + +lost_parent: /* Create the lost parent as an orphan. */ rc = lfsck_namespace_create_orphan(env, com, parent); - if (rc >= 0) + if (rc >= 0) { /* Add the missing name entry to the parent. */ rc = lfsck_namespace_insert_normal(env, com, parent, child, cname->ln_name); + if (unlikely(rc == -EEXIST)) { + /* Unfortunately, someone reused the name + * under the parent by race. So we have + * to remove the linkEA entry from + * current child object. It means that the + * LFSCK cannot recover the system + * totally back to its original status, + * but it is necessary to make the + * current system to be consistent. */ + rc = lfsck_namespace_shrink_linkea(env, + com, child, ldata, + cname, tfid, true); + if (rc >= 0) { + snprintf(info->lti_tmpbuf, + sizeof(info->lti_tmpbuf), + "-"DFID, PFID(pfid)); + rc = lfsck_namespace_insert_orphan(env, + com, child, info->lti_tmpbuf, + "D", NULL); + } + } + } GOTO(out, rc); } @@ -1580,6 +2332,16 @@ lfsck_namespace_dsd_single(const struct lu_env *env, rc = dt_lookup(env, parent, (struct dt_rec *)tfid, (const struct dt_key *)cname->ln_name, BYPASS_CAPA); if (rc == -ENOENT) { + /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT + * has ever tried to verify some remote MDT-object that resides + * on this MDT, but this MDT failed to respond such request. So + * means there may be some remote name entry on other MDT that + * references this object with another name, so we cannot know + * whether this linkEA is valid or not. So keep it there and + * maybe resolved when next LFSCK run. */ + if (ns->ln_flags & LF_INCOMPLETE) + GOTO(out, rc = 0); + if (!lustre_handle_is_used(lh) && retry != NULL) { *retry = true; @@ -1590,6 +2352,32 @@ lfsck_namespace_dsd_single(const struct lu_env *env, /* Add the missing name entry back to the namespace. */ rc = lfsck_namespace_insert_normal(env, com, parent, child, cname->ln_name); + if (unlikely(rc == -ESTALE)) + /* It may happen when the remote object has been + * removed, but the local MDT is not aware of that. */ + goto lost_parent; + + if (unlikely(rc == -EEXIST)) { + /* Unfortunately, someone reused the name under the + * parent by race. So we have to remove the linkEA + * entry from current child object. It means that the + * LFSCK cannot recover the system totally back to + * its original status, but it is necessary to make + * the current system to be consistent. + * + * It also may be because of the LFSCK found some + * internal status of create operation. Under such + * case, nothing to be done. */ + rc = lfsck_namespace_shrink_linkea_cond(env, com, + parent, child, ldata, cname, tfid); + if (rc >= 0) { + snprintf(info->lti_tmpbuf, + sizeof(info->lti_tmpbuf), + "-"DFID, PFID(pfid)); + rc = lfsck_namespace_insert_orphan(env, com, + child, info->lti_tmpbuf, "D", NULL); + } + } GOTO(out, rc); } @@ -1677,9 +2465,12 @@ lfsck_namespace_dsd_multiple(const struct lu_env *env, const struct lu_fid *cfid = lfsck_dto2fid(child); struct lu_fid *tfid = &info->lti_fid3; struct lu_fid *pfid2 = &info->lti_fid4; + struct lfsck_namespace *ns = com->lc_file_ram; struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; struct dt_object *parent = NULL; struct linkea_data ldata_new = { 0 }; + int count = 0; int rc = 0; bool once = true; ENTRY; @@ -1779,8 +2570,53 @@ rebuild: rc = lfsck_namespace_rebuild_linkea(env, com, child, &ldata_new); + if (rc < 0) + RETURN(rc); + + linkea_del_buf(ldata, cname); + linkea_first_entry(ldata); + /* There may be some invalid dangling name entries under + * other parent directories, remove all of them. */ + while (ldata->ld_lee != NULL) { + lfsck_namespace_unpack_linkea_entry(ldata, + cname, tfid, info->lti_key); + if (!fid_is_sane(tfid)) + goto next; + + parent = lfsck_object_find_bottom(env, lfsck, + tfid); + if (IS_ERR(parent)) { + rc = PTR_ERR(parent); + if (rc != -ENOENT && + bk->lb_param & LPF_FAILOUT) + RETURN(rc); + + goto next; + } + + if (!dt_object_exists(parent)) { + lfsck_object_put(env, parent); + goto next; + } + + rc = lfsck_namespace_repair_dirent(env, com, + parent, child, cname->ln_name, + cname->ln_name, S_IFDIR, false, true); + lfsck_object_put(env, parent); + if (rc < 0) { + if (bk->lb_param & LPF_FAILOUT) + RETURN(rc); + + goto next; + } + + count += rc; + +next: + linkea_del_buf(ldata, cname); + } - /* XXX: there will be other patch. */ + ns->ln_dirent_repaired += count; RETURN(rc); } @@ -1894,7 +2730,8 @@ static int lfsck_namespace_double_scan_dir(const struct lu_env *env, LASSERT(!dt_object_remote(child)); - if (!(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) { + if (flags & (LNTF_CHECK_LINKEA | LNTF_CHECK_PARENT) && + !(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) { CDEBUG(D_LFSCK, "%s: some MDT(s) maybe NOT take part in the" "the namespace LFSCK, then the LFSCK cannot guarantee" "all the name entries have been verified in first-stage" @@ -1979,6 +2816,20 @@ lock: * directory, then handle it as orphan. */ lfsck_ibits_unlock(&lh, LCK_EX); type = LNIT_MUL_REF; + + /* If the LFSCK is marked as LF_INCOMPLETE, + * then means some MDT has ever tried to + * verify some remote MDT-object that resides + * on this MDT, but this MDT failed to respond + * such request. So means there may be some + * remote name entry on other MDT that + * references this object with another name, + * so we cannot know whether this linkEA is + * valid or not. So keep it there and maybe + * resolved when next LFSCK run. */ + if (ns->ln_flags & LF_INCOMPLETE) + GOTO(out, rc = 0); + snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf), "-"DFID, PFID(pfid)); rc = lfsck_namespace_insert_orphan(env, com, child, @@ -2147,6 +2998,8 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env, GOTO(out, rc = PTR_ERR(parent)); if (!dt_object_exists(parent)) { + +lost_parent: if (ldata.ld_leh->leh_reccount > 1) { /* If it is NOT the last linkEA entry, then * there is still other chance to make the @@ -2155,6 +3008,22 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env, rc = lfsck_namespace_shrink_linkea(env, com, child, &ldata, cname, pfid, true); } else { + /* If the LFSCK is marked as LF_INCOMPLETE, + * then means some MDT has ever tried to + * verify some remote MDT-object that resides + * on this MDT, but this MDT failed to respond + * such request. So means there may be some + * remote name entry on other MDT that + * references this object with another name, + * so we cannot know whether this linkEA is + * valid or not. So keep it there and maybe + * resolved when next LFSCK run. */ + if (ns->ln_flags & LF_INCOMPLETE) { + lfsck_object_put(env, parent); + + GOTO(out, rc = 0); + } + /* Create the lost parent as an orphan. */ rc = lfsck_namespace_create_orphan(env, com, parent); @@ -2170,7 +3039,20 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env, /* Add the missing name entry to the parent. */ rc = lfsck_namespace_insert_normal(env, com, parent, child, cname->ln_name); - linkea_next_entry(&ldata); + if (unlikely(rc == -EEXIST)) + /* Unfortunately, someone reused the + * name under the parent by race. So we + * have to remove the linkEA entry from + * current child object. It means that + * the LFSCK cannot recover the system + * totally back to its original status, + * but it is necessary to make the + * current system to be consistent. */ + rc = lfsck_namespace_shrink_linkea(env, + com, child, &ldata, + cname, pfid, true); + else + linkea_next_entry(&ldata); } lfsck_object_put(env, parent); @@ -2262,17 +3144,49 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env, continue; } + /* If the LFSCK is marked as LF_INCOMPLETE, then means some + * MDT has ever tried to verify some remote MDT-object that + * resides on this MDT, but this MDT failed to respond such + * request. So means there may be some remote name entry on + * other MDT that references this object with another name, + * so we cannot know whether this linkEA is valid or not. + * So keep it there and maybe resolved when next LFSCK run. */ + if (ns->ln_flags & LF_INCOMPLETE) { + lfsck_object_put(env, parent); + + GOTO(out, rc = 0); + } + /* Add the missing name entry back to the namespace. */ rc = lfsck_namespace_insert_normal(env, com, parent, child, cname->ln_name); + if (unlikely(rc == -ESTALE)) + /* It may happen when the remote object has been + * removed, but the local MDT is not aware of that. */ + goto lost_parent; + + if (unlikely(rc == -EEXIST)) + /* Unfortunately, someone reused the name under the + * parent by race. So we have to remove the linkEA + * entry from current child object. It means that the + * LFSCK cannot recover the system totally back to + * its original status, but it is necessary to make + * the current system to be consistent. + * + * It also may be because of the LFSCK found some + * internal status of create operation. Under such + * case, nothing to be done. */ + rc = lfsck_namespace_shrink_linkea_cond(env, com, + parent, child, &ldata, cname, pfid); + else + linkea_next_entry(&ldata); + lfsck_object_put(env, parent); if (rc < 0) GOTO(out, rc); if (rc > 0) repaired = true; - - linkea_next_entry(&ldata); } GOTO(out, rc = 0); @@ -2287,7 +3201,14 @@ out: count = ldata.ld_leh->leh_reccount; } - if (count == 0) { + /* If the LFSCK is marked as LF_INCOMPLETE, then means some + * MDT has ever tried to verify some remote MDT-object that + * resides on this MDT, but this MDT failed to respond such + * request. So means there may be some remote name entry on + * other MDT that references this object with another name, + * so we cannot know whether this linkEA is valid or not. + * So keep it there and maybe resolved when next LFSCK run. */ + if (count == 0 && !(ns->ln_flags & LF_INCOMPLETE)) { /* If the child becomes orphan, then insert it into * the global .lustre/lost+found/MDTxxxx directory. */ rc = lfsck_namespace_insert_orphan(env, com, child, "", "O", @@ -2338,13 +3259,14 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m, "dirent_repaired: "LPU64"\n" "linkea_repaired: "LPU64"\n" "nlinks_repaired: "LPU64"\n" - "lost_found: "LPU64"\n" "multiple_linked_checked: "LPU64"\n" "multiple_linked_repaired: "LPU64"\n" "unknown_inconsistency: "LPU64"\n" "unmatched_pairs_repaired: "LPU64"\n" "dangling_repaired: "LPU64"\n" "multiple_referenced_repaired: "LPU64"\n" + "bad_file_type_repaired: "LPU64"\n" + "lost_dirent_repaired: "LPU64"\n" "success_count: %u\n" "run_time_phase1: %u seconds\n" "run_time_phase2: %u seconds\n", @@ -2358,13 +3280,14 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m, ns->ln_dirent_repaired, ns->ln_linkea_repaired, ns->ln_objs_nlink_repaired, - ns->ln_objs_lost_found, ns->ln_mul_linked_checked, ns->ln_mul_linked_repaired, ns->ln_unknown_inconsistency, ns->ln_unmatched_pairs_repaired, ns->ln_dangling_repaired, ns->ln_mul_ref_repaired, + ns->ln_bad_type_repaired, + ns->ln_lost_dirent_repaired, ns->ln_success_count, time_phase1, time_phase2); @@ -2375,11 +3298,12 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m, static int lfsck_namespace_reset(const struct lu_env *env, struct lfsck_component *com, bool init) { - struct lfsck_instance *lfsck = com->lc_lfsck; - struct lfsck_namespace *ns = com->lc_file_ram; - struct dt_object *root; - struct dt_object *dto; - int rc; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_assistant_data *lad = com->lc_data; + struct dt_object *root; + struct dt_object *dto; + int rc; ENTRY; root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid); @@ -2422,7 +3346,10 @@ static int lfsck_namespace_reset(const struct lu_env *env, if (rc != 0) GOTO(out, rc); - rc = lfsck_namespace_store(env, com, true); + lad->lad_incomplete = 0; + CFS_RESET_BITMAP(lad->lad_bitmap); + + rc = lfsck_namespace_store(env, com); GOTO(out, rc); @@ -2475,7 +3402,7 @@ static int lfsck_namespace_checkpoint(const struct lu_env *env, com->lc_new_checked = 0; } - rc = lfsck_namespace_store(env, com, false); + rc = lfsck_namespace_store(env, com); up_write(&com->lc_sem); log: @@ -2497,17 +3424,18 @@ static int lfsck_namespace_prep(const struct lu_env *env, struct lfsck_position *pos = &com->lc_pos_start; int rc; - if (ns->ln_status == LS_COMPLETED) { + rc = lfsck_namespace_load_bitmap(env, com); + if (rc > 0 || (rc == 0 && ns->ln_status == LS_COMPLETED)) { rc = lfsck_namespace_reset(env, com, false); if (rc == 0) rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true); + } - if (rc != 0) { - CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: " - "rc = %d\n", lfsck_lfsck2name(lfsck), rc); + if (rc != 0) { + CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: rc = %d\n", + lfsck_lfsck2name(lfsck), rc); - return rc; - } + return rc; } down_write(&com->lc_sem); @@ -2535,7 +3463,6 @@ static int lfsck_namespace_prep(const struct lu_env *env, ns->ln_objs_repaired_phase2 = 0; ns->ln_objs_failed_phase2 = 0; ns->ln_objs_nlink_repaired = 0; - ns->ln_objs_lost_found = 0; ns->ln_dirent_repaired = 0; ns->ln_linkea_repaired = 0; ns->ln_mul_linked_checked = 0; @@ -2544,6 +3471,8 @@ static int lfsck_namespace_prep(const struct lu_env *env, ns->ln_unmatched_pairs_repaired = 0; ns->ln_dangling_repaired = 0; ns->ln_mul_ref_repaired = 0; + ns->ln_bad_type_repaired = 0; + ns->ln_lost_dirent_repaired = 0; fid_zero(&ns->ln_fid_latest_scanned_phase2); if (list_empty(&com->lc_link_dir)) list_add_tail(&com->lc_link_dir, @@ -2774,7 +3703,7 @@ static int lfsck_namespace_post(const struct lu_env *env, com->lc_new_checked = 0; } - rc = lfsck_namespace_store(env, com, false); + rc = lfsck_namespace_store(env, com); up_write(&com->lc_sem); CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n", @@ -3001,17 +3930,53 @@ static int lfsck_namespace_in_notify(const struct lu_env *env, struct lfsck_assistant_data *lad = com->lc_data; struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs; struct lfsck_tgt_desc *ltd; + int rc; bool fail = false; ENTRY; - if (lr->lr_event != LE_PHASE1_DONE && - lr->lr_event != LE_PHASE2_DONE && - lr->lr_event != LE_PEER_EXIT) + switch (lr->lr_event) { + case LE_CREATE_ORPHAN: { + struct dt_object *orphan = NULL; + + CDEBUG(D_LFSCK, "%s: namespace LFSCK handling notify from " + "MDT %x to create orphan"DFID" with type %o\n", + lfsck_lfsck2name(lfsck), lr->lr_index, + PFID(&lr->lr_fid), lr->lr_type); + + orphan = lfsck_object_find(env, lfsck, &lr->lr_fid); + if (IS_ERR(orphan)) + GOTO(out_create, rc = PTR_ERR(orphan)); + + if (dt_object_exists(orphan)) + GOTO(out_create, rc = -EEXIST); + + rc = lfsck_namespace_create_orphan_local(env, com, orphan, + lr->lr_type); + + GOTO(out_create, rc = (rc == 1) ? 0 : rc); + +out_create: + CDEBUG(D_LFSCK, "%s: namespace LFSCK handled notify from " + "MDT %x to create orphan"DFID" with type %o: rc = %d\n", + lfsck_lfsck2name(lfsck), lr->lr_index, + PFID(&lr->lr_fid), lr->lr_type, rc); + + if (orphan != NULL && !IS_ERR(orphan)) + lfsck_object_put(env, orphan); + + return rc; + } + case LE_PHASE1_DONE: + case LE_PHASE2_DONE: + case LE_PEER_EXIT: + break; + default: RETURN(-EINVAL); + } CDEBUG(D_LFSCK, "%s: namespace LFSCK handles notify %u from MDT %x, " - "status %d\n", lfsck_lfsck2name(lfsck), lr->lr_event, - lr->lr_index, lr->lr_status); + "status %d, flags %x\n", lfsck_lfsck2name(lfsck), lr->lr_event, + lr->lr_index, lr->lr_status, lr->lr_flags2); spin_lock(<ds->ltd_lock); ltd = LTD_TGT(ltds, lr->lr_index); @@ -3036,6 +4001,9 @@ static int lfsck_namespace_in_notify(const struct lu_env *env, break; } + if (lr->lr_flags2 & LF_INCOMPLETE) + ns->ln_flags |= LF_INCOMPLETE; + if (list_empty(<d->ltd_namespace_list)) list_add_tail(<d->ltd_namespace_list, &lad->lad_mdt_list); @@ -3111,7 +4079,7 @@ static struct lfsck_operations lfsck_namespace_ops = { * and the users can make the decision about how to handle it with * more human knownledge. (by default) * - * 2) Re-create the missed MDT-object with the FID information. + * 2) Re-create the missing MDT-object with the FID information. * * \param[in] env pointer to the thread context * \param[in] com pointer to the lfsck component @@ -3320,14 +4288,14 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env, struct dt_object *dir = lnr->lnr_obj; struct dt_object *obj = NULL; const struct lu_fid *pfid = lfsck_dto2fid(dir); - struct dt_device *dev; + struct dt_device *dev = NULL; struct lustre_handle lh = { 0 }; bool repaired = false; bool dtlocked = false; bool remove; bool newdata; bool log = false; - int idx; + int idx = 0; int count = 0; int rc; enum lfsck_namespace_inconsistency_type type = LNIT_NONE; @@ -3384,7 +4352,7 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env, CDEBUG(D_LFSCK, "%s: cannot talk with MDT %x which " "did not join the namespace LFSCK\n", lfsck_lfsck2name(lfsck), idx); - ns->ln_flags |= LF_INCOMPLETE; + lfsck_lad_set_bitmap(env, com, idx); GOTO(out, rc = -ENODEV); } @@ -3454,16 +4422,34 @@ again: } /* It may happen when the remote object has been removed, - * but the local MDT does not aware of that. */ + * but the local MDT is not aware of that. */ goto dangling; } else if (rc == 0) { count = ldata.ld_leh->leh_reccount; rc = linkea_links_find(&ldata, cname, pfid); if ((rc == 0) && - (count == 1 || !S_ISDIR(lfsck_object_type(obj)))) + (count == 1 || !S_ISDIR(lfsck_object_type(obj)))) { + if ((lfsck_object_type(obj) & S_IFMT) != + lnr->lnr_type) { + ns->ln_flags |= LF_INCONSISTENT; + type = LNIT_BAD_TYPE; + } + goto record; + } ns->ln_flags |= LF_INCONSISTENT; + + /* If the file type stored in the name entry does not match + * the file type claimed by the object, and the object does + * not recognize the name entry, then it is quite possible + * that the name entry is corrupted. */ + if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type) { + type = LNIT_BAD_DIRENT; + + GOTO(stop, rc = 0); + } + /* For sub-dir object, we cannot make sure whether the sub-dir * back references the parent via ".." name entry correctly or * not in the LFSCK first-stage scanning. It may be that the @@ -3477,6 +4463,9 @@ again: newdata = false; goto nodata; } else if (unlikely(rc == -EINVAL)) { + if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type) + type = LNIT_BAD_TYPE; + count = 1; ns->ln_flags |= LF_INCONSISTENT; /* The magic crashed, we are not sure whether there are more @@ -3485,6 +4474,9 @@ again: newdata = true; goto nodata; } else if (rc == -ENODATA) { + if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type) + type = LNIT_BAD_TYPE; + count = 1; ns->ln_flags |= LF_UPGRADE; remove = false; @@ -3574,6 +4566,34 @@ stop: out: lfsck_ibits_unlock(&lh, LCK_EX); + + if (rc >= 0) { + switch (type) { + case LNIT_BAD_TYPE: + log = false; + rc = lfsck_namespace_repair_dirent(env, com, dir, + obj, lnr->lnr_name, lnr->lnr_name, + lnr->lnr_type, true, false); + if (rc > 0) + repaired = true; + break; + case LNIT_BAD_DIRENT: + log = false; + /* XXX: This is a bad dirent, we do not know whether + * the original name entry reference a regular + * file or a directory, then keep the parent's + * nlink count unchanged here. */ + rc = lfsck_namespace_repair_dirent(env, com, dir, + obj, lnr->lnr_name, lnr->lnr_name, + lnr->lnr_type, false, false); + if (rc > 0) + repaired = true; + break; + default: + break; + } + } + down_write(&com->lc_sem); if (rc < 0) { CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle " @@ -3583,6 +4603,12 @@ out: lnr->lnr_namelen, lnr->lnr_name, rc); lfsck_namespace_record_failure(env, lfsck, ns); + if ((rc == -ENOTCONN || rc == -ESHUTDOWN || rc == -EREMCHG || + rc == -ETIMEDOUT || rc == -EHOSTDOWN || + rc == -EHOSTUNREACH || rc == -EINPROGRESS) && + dev != NULL && dev != lfsck->li_next) + lfsck_lad_set_bitmap(env, com, idx); + if (!(bk->lb_param & LPF_FAILOUT)) rc = 0; } else { @@ -3601,6 +4627,12 @@ out: case LNIT_DANGLING: ns->ln_dangling_repaired++; break; + case LNIT_BAD_TYPE: + ns->ln_bad_type_repaired++; + break; + case LNIT_BAD_DIRENT: + ns->ln_dirent_repaired++; + break; default: break; } @@ -3611,12 +4643,14 @@ out: &ns->ln_pos_first_inconsistent, false); } + rc = 0; } up_write(&com->lc_sem); if (obj != NULL && !IS_ERR(obj)) lfsck_object_put(env, obj); + return rc; } @@ -3724,7 +4758,7 @@ checkpoint: ns->ln_time_last_checkpoint = cfs_time_current_sec(); ns->ln_objs_checked_phase2 += com->lc_new_checked; com->lc_new_checked = 0; - rc = lfsck_namespace_store(env, com, false); + rc = lfsck_namespace_store(env, com); up_write(&com->lc_sem); if (rc != 0) GOTO(put, rc); @@ -3805,17 +4839,108 @@ static int lfsck_namespace_double_scan_result(const struct lu_env *env, ns->ln_status = LS_FAILED; } - rc = lfsck_namespace_store(env, com, false); + rc = lfsck_namespace_store(env, com); up_write(&com->lc_sem); return rc; } +static int +lfsck_namespace_assistant_sync_failures_interpret(const struct lu_env *env, + struct ptlrpc_request *req, + void *args, int rc) +{ + return 0; +} + +/** + * Notify remote LFSCK instances about former failures. + * + * The local LFSCK instance has recorded which MDTs have ever failed to respond + * some LFSCK verification requests (maybe because of network issues or the MDT + * itself trouble). During the respond gap the MDT may missed some name entries + * verification, then the MDT cannot know whether related MDT-objects have been + * referenced by related name entries or not, then in the second-stage scanning, + * these MDT-objects will be regarded as orphan, if the MDT-object contains bad + * linkEA for back reference, then it will misguide the LFSCK to generate wrong + * name entry for repairing the orphan. + * + * To avoid above trouble, when layout LFSCK finishes the first-stage scanning, + * it will scan the bitmap for the ever failed MDTs, and notify them that they + * have ever missed some name entries verification and should skip the handling + * for orphan MDT-objects. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * \param[in] lr pointer to the lfsck request + */ static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env, struct lfsck_component *com, struct lfsck_request *lr) { - /* XXX: TBD */ + struct lfsck_async_interpret_args *laia = + &lfsck_env_info(env)->lti_laia2; + struct lfsck_assistant_data *lad = com->lc_data; + struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs; + struct lfsck_tgt_desc *ltd; + struct ptlrpc_request_set *set; + int rc = 0; + ENTRY; + + set = ptlrpc_prep_set(); + if (set == NULL) + GOTO(out, rc = -ENOMEM); + + lr->lr_flags2 = ns->ln_flags | LF_INCOMPLETE; + memset(laia, 0, sizeof(*laia)); + lad->lad_touch_gen++; + + spin_lock(<ds->ltd_lock); + while (!list_empty(&lad->lad_mdt_list)) { + ltd = list_entry(lad->lad_mdt_list.next, + struct lfsck_tgt_desc, + ltd_namespace_list); + if (ltd->ltd_namespace_gen == lad->lad_touch_gen) + break; + + ltd->ltd_namespace_gen = lad->lad_touch_gen; + list_move_tail(<d->ltd_namespace_list, + &lad->lad_mdt_list); + if (!lad->lad_incomplete || + !cfs_bitmap_check(lad->lad_bitmap, ltd->ltd_index)) { + ltd->ltd_namespace_failed = 0; + continue; + } + + ltd->ltd_namespace_failed = 1; + spin_unlock(<ds->ltd_lock); + rc = lfsck_async_request(env, ltd->ltd_exp, lr, set, + lfsck_namespace_assistant_sync_failures_interpret, + laia, LFSCK_NOTIFY); + if (rc != 0) + CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail " + "to sync failure with MDT %x: rc = %d\n", + lfsck_lfsck2name(lfsck), ltd->ltd_index, rc); + + spin_lock(<ds->ltd_lock); + } + spin_unlock(<ds->ltd_lock); + + rc = ptlrpc_set_wait(set); + ptlrpc_set_destroy(set); + + GOTO(out, rc); + +out: + if (rc != 0) + CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail " + "to sync failure with MDTs, and related MDTs " + "may handle orphan un-properly: rc = %d\n", + lfsck_lfsck2name(lfsck), rc); + + EXIT; } struct lfsck_assistant_operations lfsck_namespace_assistant_ops = {