X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flfsck%2Flfsck_layout.c;h=98e61d236cb556aa983afacf5d1f2123d3604060;hb=refs%2Fchanges%2F46%2F10046%2F7;hp=f77ee876515f80bb9c32395f0f1b2528ded2483b;hpb=77eea1985bb1655e58c8b7df00703b4f08b58ec7;p=fs%2Flustre-release.git diff --git a/lustre/lfsck/lfsck_layout.c b/lustre/lfsck/lfsck_layout.c index f77ee87..98e61d236 100644 --- a/lustre/lfsck/lfsck_layout.c +++ b/lustre/lfsck/lfsck_layout.c @@ -349,26 +349,47 @@ again: static int lfsck_layout_verify_header(struct lov_mds_md_v1 *lmm) { __u32 magic; - __u32 patten; + __u32 pattern; magic = le32_to_cpu(lmm->lmm_magic); /* If magic crashed, keep it there. Sometime later, during OST-object * orphan handling, if some OST-object(s) back-point to it, it can be * verified and repaired. */ - if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3) - return -EINVAL; + if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3) { + struct ost_id oi; + int rc; + + lmm_oi_cpu_to_le(&oi, &lmm->lmm_oi); + if ((magic & LOV_MAGIC_MASK) == LOV_MAGIC_MAGIC) + rc = -EOPNOTSUPP; + else + rc = -EINVAL; + + CDEBUG(D_LFSCK, "%s LOV EA magic %u on "DOSTID"\n", + rc == -EINVAL ? "Unknown" : "Unsupported", + magic, POSTID(&oi)); - patten = le32_to_cpu(lmm->lmm_pattern); + return rc; + } + + pattern = le32_to_cpu(lmm->lmm_pattern); /* XXX: currently, we only support LOV_PATTERN_RAID0. */ - if (patten != LOV_PATTERN_RAID0) + if (lov_pattern(pattern) != LOV_PATTERN_RAID0) { + struct ost_id oi; + + lmm_oi_cpu_to_le(&oi, &lmm->lmm_oi); + CDEBUG(D_LFSCK, "Unsupported LOV EA pattern %u on "DOSTID"\n", + pattern, POSTID(&oi)); + return -EOPNOTSUPP; + } return 0; } #define LFSCK_RBTREE_BITMAP_SIZE PAGE_CACHE_SIZE #define LFSCK_RBTREE_BITMAP_WIDTH (LFSCK_RBTREE_BITMAP_SIZE << 3) -#define LFSCK_RBTREE_BITMAP_MASK (LFSCK_RBTREE_BITMAP_SIZE - 1) +#define LFSCK_RBTREE_BITMAP_MASK (LFSCK_RBTREE_BITMAP_WIDTH - 1) struct lfsck_rbtree_node { struct rb_node lrn_node; @@ -392,7 +413,7 @@ static inline int lfsck_rbtree_cmp(struct lfsck_rbtree_node *lrn, if (oid < lrn->lrn_first_oid) return -1; - if (oid >= lrn->lrn_first_oid + LFSCK_RBTREE_BITMAP_WIDTH) + if (oid - lrn->lrn_first_oid >= LFSCK_RBTREE_BITMAP_WIDTH) return 1; return 0; @@ -492,19 +513,19 @@ static struct lfsck_rbtree_node * lfsck_rbtree_insert(struct lfsck_layout_slave_data *llsd, struct lfsck_rbtree_node *lrn) { - struct rb_node **pos = &(llsd->llsd_rb_root.rb_node); + struct rb_node **pos = &llsd->llsd_rb_root.rb_node; struct rb_node *parent = NULL; struct lfsck_rbtree_node *tmp; int rc; - while (*pos) { + while (*pos != NULL) { parent = *pos; - tmp = rb_entry(*pos, struct lfsck_rbtree_node, lrn_node); + tmp = rb_entry(parent, struct lfsck_rbtree_node, lrn_node); rc = lfsck_rbtree_cmp(tmp, lrn->lrn_seq, lrn->lrn_first_oid); if (rc < 0) - pos = &((*pos)->rb_left); + pos = &(*pos)->rb_left; else if (rc > 0) - pos = &((*pos)->rb_right); + pos = &(*pos)->rb_right; else return tmp; } @@ -771,7 +792,8 @@ static int lfsck_layout_store(const struct lu_env *env, RETURN(rc); } - rc = dt_declare_record_write(env, obj, size, pos, handle); + rc = dt_declare_record_write(env, obj, lfsck_buf_get(env, lo, size), + pos, handle); if (rc != 0) { CERROR("%s: fail to declare trans for storing lfsck_layout(1): " "rc = %d\n", lfsck_lfsck2name(lfsck), rc); @@ -913,7 +935,10 @@ lfsck_layout_lastid_create(const struct lu_env *env, if (rc != 0) GOTO(stop, rc); - rc = dt_declare_record_write(env, obj, sizeof(lastid), pos, th); + rc = dt_declare_record_write(env, obj, + lfsck_buf_get(env, &lastid, + sizeof(lastid)), + pos, th); if (rc != 0) GOTO(stop, rc); @@ -1033,8 +1058,11 @@ lfsck_layout_lastid_store(const struct lu_env *env, continue; } + lastid = cpu_to_le64(lls->lls_lastid); rc = dt_declare_record_write(env, lls->lls_lastid_obj, - sizeof(lastid), pos, th); + lfsck_buf_get(env, &lastid, + sizeof(lastid)), + pos, th); if (rc != 0) goto stop; @@ -1042,7 +1070,6 @@ lfsck_layout_lastid_store(const struct lu_env *env, if (rc != 0) goto stop; - lastid = cpu_to_le64(lls->lls_lastid); dt_write_lock(env, lls->lls_lastid_obj, 0); rc = dt_record_write(env, lls->lls_lastid_obj, lfsck_buf_get(env, &lastid, @@ -1138,6 +1165,17 @@ out: return rc; } +static void lfsck_layout_record_failure(const struct lu_env *env, + struct lfsck_instance *lfsck, + struct lfsck_layout *lo) +{ + lo->ll_objs_failed_phase1++; + if (unlikely(lo->ll_pos_first_inconsistent == 0)) + lo->ll_pos_first_inconsistent = + lfsck->li_obj_oit->do_index_ops->dio_it.store(env, + lfsck->li_di_oit); +} + static int lfsck_layout_master_async_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *args, int rc) @@ -1562,7 +1600,6 @@ static int lfsck_layout_double_scan_result(const struct lu_env *env, struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; down_write(&com->lc_sem); - lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() + HALF_SEC - lfsck->li_time_last_checkpoint); lo->ll_time_last_checkpoint = cfs_time_current_sec(); @@ -1586,15 +1623,7 @@ static int lfsck_layout_double_scan_result(const struct lu_env *env, lo->ll_status = LS_FAILED; } - if (lo->ll_status != LS_PAUSED) { - spin_lock(&lfsck->li_lock); - list_del_init(&com->lc_link); - list_add_tail(&com->lc_link, &lfsck->li_list_idle); - spin_unlock(&lfsck->li_lock); - } - rc = lfsck_layout_store(env, com); - up_write(&com->lc_sem); return rc; @@ -1645,27 +1674,1018 @@ static int lfsck_layout_trans_stop(const struct lu_env *env, { int rc; - handle->th_result = result; - rc = dt_trans_stop(env, dev, handle); - if (rc > 0) - rc = 0; - else if (rc == 0) - rc = 1; + handle->th_result = result; + rc = dt_trans_stop(env, dev, handle); + if (rc > 0) + rc = 0; + else if (rc == 0) + rc = 1; + + return rc; +} + +/** + * Get the system default stripe size. + * + * \param[in] env pointer to the thread context + * \param[in] lfsck pointer to the lfsck instance + * \param[out] size pointer to the default stripe size + * + * \retval 0 for success + * \retval negative error number on failure + */ +static int lfsck_layout_get_def_stripesize(const struct lu_env *env, + struct lfsck_instance *lfsck, + __u32 *size) +{ + struct lov_user_md *lum = &lfsck_env_info(env)->lti_lum; + struct dt_object *root; + int rc; + + root = dt_locate(env, lfsck->li_next, &lfsck->li_local_root_fid); + if (IS_ERR(root)) + return PTR_ERR(root); + + /* Get the default stripe size via xattr_get on the backend root. */ + rc = dt_xattr_get(env, root, lfsck_buf_get(env, lum, sizeof(*lum)), + XATTR_NAME_LOV, BYPASS_CAPA); + if (rc > 0) { + /* The lum->lmm_stripe_size is LE mode. The *size also + * should be LE mode. So it is unnecessary to convert. */ + *size = lum->lmm_stripe_size; + rc = 0; + } else if (unlikely(rc == 0)) { + rc = -EINVAL; + } + + lfsck_object_put(env, root); + + return rc; +} + +/** + * \retval +1: repaired + * \retval 0: did nothing + * \retval -ve: on error + */ +static int lfsck_layout_refill_lovea(const struct lu_env *env, + struct thandle *handle, + struct dt_object *parent, + struct lu_fid *cfid, + struct lu_buf *buf, + struct lov_ost_data_v1 *slot, + int fl, __u32 ost_idx) +{ + struct ost_id *oi = &lfsck_env_info(env)->lti_oi; + struct lov_mds_md_v1 *lmm = buf->lb_buf; + int rc; + + fid_to_ostid(cfid, oi); + ostid_cpu_to_le(oi, &slot->l_ost_oi); + slot->l_ost_gen = cpu_to_le32(0); + slot->l_ost_idx = cpu_to_le32(ost_idx); + + if (le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_HOLE) { + struct lov_ost_data_v1 *objs; + int i; + __u16 count; + + count = le16_to_cpu(lmm->lmm_stripe_count); + if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1) + objs = &lmm->lmm_objects[0]; + else + objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0]; + for (i = 0; i < count; i++, objs++) { + if (objs != slot && lovea_slot_is_dummy(objs)) + break; + } + + /* If the @slot is the last dummy slot to be refilled, + * then drop LOV_PATTERN_F_HOLE from lmm::lmm_pattern. */ + if (i == count) + lmm->lmm_pattern &= ~cpu_to_le32(LOV_PATTERN_F_HOLE); + } + + rc = dt_xattr_set(env, parent, buf, XATTR_NAME_LOV, fl, handle, + BYPASS_CAPA); + if (rc == 0) + rc = 1; + + return rc; +} + +/** + * \retval +1: repaired + * \retval 0: did nothing + * \retval -ve: on error + */ +static int lfsck_layout_extend_lovea(const struct lu_env *env, + struct lfsck_instance *lfsck, + struct thandle *handle, + struct dt_object *parent, + struct lu_fid *cfid, + struct lu_buf *buf, int fl, + __u32 ost_idx, __u32 ea_off, bool reset) +{ + struct lov_mds_md_v1 *lmm = buf->lb_buf; + struct lov_ost_data_v1 *objs; + int rc; + __u16 count; + ENTRY; + + if (fl == LU_XATTR_CREATE || reset) { + __u32 pattern = LOV_PATTERN_RAID0; + + count = ea_off + 1; + LASSERT(buf->lb_len == lov_mds_md_size(count, LOV_MAGIC_V1)); + + if (ea_off != 0 || reset) + pattern |= LOV_PATTERN_F_HOLE; + + memset(lmm, 0, buf->lb_len); + lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1); + lmm->lmm_pattern = cpu_to_le32(pattern); + fid_to_lmm_oi(lfsck_dto2fid(parent), &lmm->lmm_oi); + lmm_oi_cpu_to_le(&lmm->lmm_oi, &lmm->lmm_oi); + + rc = lfsck_layout_get_def_stripesize(env, lfsck, + &lmm->lmm_stripe_size); + if (rc != 0) + RETURN(rc); + + objs = &lmm->lmm_objects[ea_off]; + } else { + __u32 magic = le32_to_cpu(lmm->lmm_magic); + int gap; + + count = le16_to_cpu(lmm->lmm_stripe_count); + if (magic == LOV_MAGIC_V1) + objs = &lmm->lmm_objects[count]; + else + objs = &((struct lov_mds_md_v3 *)lmm)-> + lmm_objects[count]; + + gap = ea_off - count; + if (gap >= 0) + count = ea_off + 1; + LASSERT(buf->lb_len == lov_mds_md_size(count, magic)); + + if (gap > 0) { + memset(objs, 0, gap * sizeof(*objs)); + lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_HOLE); + } + + lmm->lmm_layout_gen = + cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1); + objs += gap; + } + + lmm->lmm_stripe_count = cpu_to_le16(count); + rc = lfsck_layout_refill_lovea(env, handle, parent, cfid, buf, objs, + fl, ost_idx); + + RETURN(rc); +} + +/** + * \retval +1: repaired + * \retval 0: did nothing + * \retval -ve: on error + */ +static int lfsck_layout_update_pfid(const struct lu_env *env, + struct lfsck_component *com, + struct dt_object *parent, + struct lu_fid *cfid, + struct dt_device *cdev, __u32 ea_off) +{ + struct filter_fid *pfid = &lfsck_env_info(env)->lti_new_pfid; + struct dt_object *child; + struct thandle *handle; + const struct lu_fid *tfid = lu_object_fid(&parent->do_lu); + struct lu_buf *buf; + int rc = 0; + ENTRY; + + child = lfsck_object_find_by_dev(env, cdev, cfid); + if (IS_ERR(child)) + RETURN(PTR_ERR(child)); + + handle = dt_trans_create(env, cdev); + if (IS_ERR(handle)) + GOTO(out, rc = PTR_ERR(handle)); + + pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq); + pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid); + /* Currently, the filter_fid::ff_parent::f_ver is not the real parent + * MDT-object's FID::f_ver, instead it is the OST-object index in its + * parent MDT-object's layout EA. */ + pfid->ff_parent.f_stripe_idx = cpu_to_le32(ea_off); + buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid)); + + rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_trans_start(env, cdev, handle); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle, + BYPASS_CAPA); + + GOTO(stop, rc = (rc == 0 ? 1 : rc)); + +stop: + dt_trans_stop(env, cdev, handle); + +out: + lu_object_put(env, &child->do_lu); + + return rc; +} + +/** + * \retval +1: repaired + * \retval 0: did nothing + * \retval -ve: on error + */ +static int lfsck_layout_recreate_parent(const struct lu_env *env, + struct lfsck_component *com, + struct lfsck_tgt_desc *ltd, + struct lu_orphan_rec *rec, + struct lu_fid *cfid, + const char *prefix, + const char *postfix, + __u32 ea_off) +{ + struct lfsck_thread_info *info = lfsck_env_info(env); + char *name = info->lti_key; + struct lu_attr *la = &info->lti_la; + struct dt_object_format *dof = &info->lti_dof; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct lu_fid *pfid = &rec->lor_fid; + struct lu_fid *tfid = &info->lti_fid3; + struct dt_device *next = lfsck->li_next; + struct dt_object *pobj = NULL; + struct dt_object *cobj = NULL; + struct thandle *th = NULL; + struct lu_buf *pbuf = NULL; + struct lu_buf *ea_buf = &info->lti_big_buf; + struct lustre_handle lh = { 0 }; + int buflen = ea_buf->lb_len; + int idx = 0; + int rc = 0; + ENTRY; + + /* Create .lustre/lost+found/MDTxxxx when needed. */ + if (unlikely(lfsck->li_lpf_obj == NULL)) { + rc = lfsck_create_lpf(env, lfsck); + if (rc != 0) + RETURN(rc); + } + + if (fid_is_zero(pfid)) { + struct filter_fid *ff = &info->lti_new_pfid; + + rc = lfsck_fid_alloc(env, lfsck, pfid, false); + if (rc != 0) + RETURN(rc); + + ff->ff_parent.f_seq = cpu_to_le64(pfid->f_seq); + ff->ff_parent.f_oid = cpu_to_le32(pfid->f_oid); + /* Currently, the filter_fid::ff_parent::f_ver is not the + * real parent MDT-object's FID::f_ver, instead it is the + * OST-object index in its parent MDT-object's layout EA. */ + ff->ff_parent.f_stripe_idx = cpu_to_le32(ea_off); + pbuf = lfsck_buf_get(env, ff, sizeof(struct filter_fid)); + cobj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, cfid); + if (IS_ERR(cobj)) + RETURN(PTR_ERR(cobj)); + } + + CDEBUG(D_LFSCK, "Re-create the lost MDT-object: parent " + DFID", child "DFID", OST-index %u, stripe-index %u, " + "prefix %s, postfix %s\n", + PFID(pfid), PFID(cfid), ltd->ltd_index, ea_off, prefix, postfix); + + pobj = lfsck_object_find_by_dev(env, lfsck->li_bottom, pfid); + if (IS_ERR(pobj)) + GOTO(put, rc = PTR_ERR(pobj)); + + LASSERT(prefix != NULL); + LASSERT(postfix != NULL); + + /** name rules: + * + * 1. Use the MDT-object's FID as the name with prefix and postfix. + * + * 1.1 prefix "C-": More than one OST-objects claim the same + * MDT-object and the same slot in the layout EA. + * It may be created for dangling referenced MDT + * object or may be not. + * 1.2 prefix "N-": The orphan OST-object does not know which one + * is the real parent, so the LFSCK assign a new + * FID as its parent. + * 1.3 prefix "R-": The orphan OST-object know its parent FID but + * does not know the position in the namespace. + * + * 2. If there is name conflict, append more index for new name. */ + sprintf(name, "%s"DFID"%s", prefix, PFID(pfid), postfix); + do { + rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid, + (const struct dt_key *)name, BYPASS_CAPA); + if (rc != 0 && rc != -ENOENT) + GOTO(put, rc); + + if (unlikely(rc == 0)) { + CWARN("%s: The name %s under lost+found has been used " + "by the "DFID". Try to increase the FID version " + "for the new file name.\n", + lfsck_lfsck2name(lfsck), name, PFID(tfid)); + sprintf(name, "%s"DFID"%s-%d", prefix, PFID(pfid), + postfix, ++idx); + } + } while (rc == 0); + + memset(la, 0, sizeof(*la)); + la->la_uid = rec->lor_uid; + la->la_gid = rec->lor_gid; + la->la_mode = S_IFREG | S_IRUSR; + la->la_valid = LA_MODE | LA_UID | LA_GID; + + memset(dof, 0, sizeof(*dof)); + dof->dof_type = dt_mode_to_dft(S_IFREG); + + rc = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1); + if (buflen < rc) { + lu_buf_realloc(ea_buf, rc); + buflen = ea_buf->lb_len; + if (ea_buf->lb_buf == NULL) + GOTO(put, rc = -ENOMEM); + } else { + ea_buf->lb_len = rc; + } + + /* Hold update lock on the .lustre/lost+found/MDTxxxx/. + * + * XXX: Currently, we do not grab the PDO lock as normal create cases, + * because creating MDT-object for orphan OST-object is rare, we + * do not much care about the performance. It can be improved in + * the future when needed. */ + rc = lfsck_layout_lock(env, com, lfsck->li_lpf_obj, &lh, + MDS_INODELOCK_UPDATE); + if (rc != 0) + GOTO(put, rc); + + th = dt_trans_create(env, next); + if (IS_ERR(th)) + GOTO(unlock, rc = PTR_ERR(th)); + + /* 1a. Update OST-object's parent information remotely. + * + * If other subsequent modifications failed, then next LFSCK scanning + * will process the OST-object as orphan again with known parent FID. */ + if (cobj != NULL) { + rc = dt_declare_xattr_set(env, cobj, pbuf, XATTR_NAME_FID, 0, th); + if (rc != 0) + GOTO(stop, rc); + } + + /* 2a. Create the MDT-object locally. */ + rc = dt_declare_create(env, pobj, la, NULL, dof, th); + if (rc != 0) + GOTO(stop, rc); + + /* 3a. Add layout EA for the MDT-object. */ + rc = dt_declare_xattr_set(env, pobj, ea_buf, XATTR_NAME_LOV, + LU_XATTR_CREATE, th); + if (rc != 0) + GOTO(stop, rc); + + /* 4a. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */ + rc = dt_declare_insert(env, lfsck->li_lpf_obj, + (const struct dt_rec *)pfid, + (const struct dt_key *)name, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_trans_start(env, next, th); + if (rc != 0) + GOTO(stop, rc); + + /* 1b. Update OST-object's parent information remotely. */ + if (cobj != NULL) { + rc = dt_xattr_set(env, cobj, pbuf, XATTR_NAME_FID, 0, th, + BYPASS_CAPA); + if (rc != 0) + GOTO(stop, rc); + } + + dt_write_lock(env, pobj, 0); + /* 2b. Create the MDT-object locally. */ + rc = dt_create(env, pobj, la, NULL, dof, th); + if (rc == 0) + /* 3b. Add layout EA for the MDT-object. */ + rc = lfsck_layout_extend_lovea(env, lfsck, th, pobj, cfid, + ea_buf, LU_XATTR_CREATE, + ltd->ltd_index, ea_off, false); + dt_write_unlock(env, pobj); + if (rc < 0) + GOTO(stop, rc); + + /* 4b. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */ + rc = dt_insert(env, lfsck->li_lpf_obj, + (const struct dt_rec *)pfid, + (const struct dt_key *)name, th, BYPASS_CAPA, 1); + + GOTO(stop, rc); + +stop: + dt_trans_stop(env, next, th); + +unlock: + lfsck_layout_unlock(&lh); + +put: + if (cobj != NULL && !IS_ERR(cobj)) + lu_object_put(env, &cobj->do_lu); + if (pobj != NULL && !IS_ERR(pobj)) + lu_object_put(env, &pobj->do_lu); + ea_buf->lb_len = buflen; + + return rc >= 0 ? 1 : rc; +} + +static int lfsck_layout_master_conditional_destroy(const struct lu_env *env, + struct lfsck_component *com, + const struct lu_fid *fid, + __u32 index) +{ + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lfsck_request *lr = &info->lti_lr; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_tgt_desc *ltd; + struct ptlrpc_request *req; + struct lfsck_request *tmp; + struct obd_export *exp; + int rc = 0; + ENTRY; + + ltd = lfsck_tgt_get(&lfsck->li_ost_descs, index); + if (unlikely(ltd == NULL)) + RETURN(-ENXIO); + + exp = ltd->ltd_exp; + if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK)) + GOTO(put, rc = -EOPNOTSUPP); + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY); + if (req == NULL) + GOTO(put, rc = -ENOMEM); + + rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY); + if (rc != 0) { + ptlrpc_request_free(req); + + GOTO(put, rc); + } + + memset(lr, 0, sizeof(*lr)); + lr->lr_event = LE_CONDITIONAL_DESTROY; + lr->lr_active = LT_LAYOUT; + lr->lr_fid = *fid; + + tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST); + *tmp = *lr; + ptlrpc_request_set_replen(req); + + rc = ptlrpc_queue_wait(req); + ptlrpc_req_finished(req); + + GOTO(put, rc); + +put: + lfsck_tgt_put(ltd); + + return rc; +} + +static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env, + struct lfsck_component *com, + struct lfsck_request *lr) +{ + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lu_attr *la = &info->lti_la; + ldlm_policy_data_t *policy = &info->lti_policy; + struct ldlm_res_id *resid = &info->lti_resid; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct dt_device *dev = lfsck->li_bottom; + struct lu_fid *fid = &lr->lr_fid; + struct dt_object *obj; + struct thandle *th = NULL; + struct lustre_handle lh = { 0 }; + __u64 flags = 0; + int rc = 0; + ENTRY; + + obj = lfsck_object_find_by_dev(env, dev, fid); + if (IS_ERR(obj)) + RETURN(PTR_ERR(obj)); + + dt_read_lock(env, obj, 0); + if (dt_object_exists(obj) == 0) { + dt_read_unlock(env, obj); + + GOTO(put, rc = -ENOENT); + } + + /* Get obj's attr without lock firstly. */ + rc = dt_attr_get(env, obj, la, BYPASS_CAPA); + dt_read_unlock(env, obj); + if (rc != 0) + GOTO(put, rc); + + if (likely(la->la_ctime != 0 || la->la_mode & S_ISUID)) + GOTO(put, rc = -ETXTBSY); + + /* Acquire extent lock on [0, EOF] to sync with all possible written. */ + LASSERT(lfsck->li_namespace != NULL); + + memset(policy, 0, sizeof(*policy)); + policy->l_extent.end = OBD_OBJECT_EOF; + ost_fid_build_resid(fid, resid); + rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_EXTENT, + policy, LCK_EX, &flags, ldlm_blocking_ast, + ldlm_completion_ast, NULL, NULL, 0, + LVB_T_NONE, NULL, &lh); + if (rc != ELDLM_OK) + GOTO(put, rc = -EIO); + + dt_write_lock(env, obj, 0); + /* Get obj's attr within lock again. */ + rc = dt_attr_get(env, obj, la, BYPASS_CAPA); + if (rc != 0) + GOTO(unlock, rc); + + if (la->la_ctime != 0) + GOTO(unlock, rc = -ETXTBSY); + + th = dt_trans_create(env, dev); + if (IS_ERR(th)) + GOTO(unlock, rc = PTR_ERR(th)); + + rc = dt_declare_ref_del(env, obj, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_declare_destroy(env, obj, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, dev, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_ref_del(env, obj, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_destroy(env, obj, th); + if (rc == 0) + CDEBUG(D_LFSCK, "Destroy the empty OST-object "DFID" which " + "was created for reparing dangling referenced case. " + "But the original missed OST-object is found now.\n", + PFID(fid)); + + GOTO(stop, rc); + +stop: + dt_trans_stop(env, dev, th); + +unlock: + dt_write_unlock(env, obj); + ldlm_lock_decref(&lh, LCK_EX); + +put: + lu_object_put(env, &obj->do_lu); + + return rc; +} + +/** + * Some OST-object has occupied the specified layout EA slot. + * Such OST-object may be generated by the LFSCK when repair + * dangling referenced MDT-object, which can be indicated by + * attr::la_ctime == 0 but without S_ISUID in la_mode. If it + * is true and such OST-object has not been modified yet, we + * will replace it with the orphan OST-object; otherwise the + * LFSCK will create new MDT-object to reference the orphan. + * + * \retval +1: repaired + * \retval 0: did nothing + * \retval -ve: on error + */ +static int lfsck_layout_conflict_create(const struct lu_env *env, + struct lfsck_component *com, + struct lfsck_tgt_desc *ltd, + struct lu_orphan_rec *rec, + struct dt_object *parent, + struct lu_fid *cfid, + struct lu_buf *ea_buf, + struct lov_ost_data_v1 *slot, + __u32 ea_off, __u32 ori_len) +{ + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lu_fid *cfid2 = &info->lti_fid2; + struct ost_id *oi = &info->lti_oi; + char *postfix = info->lti_tmpbuf; + struct lov_mds_md_v1 *lmm = ea_buf->lb_buf; + struct dt_device *dev = com->lc_lfsck->li_bottom; + struct thandle *th = NULL; + struct lustre_handle lh = { 0 }; + __u32 ost_idx2 = le32_to_cpu(slot->l_ost_idx); + int rc = 0; + ENTRY; + + ostid_le_to_cpu(&slot->l_ost_oi, oi); + ostid_to_fid(cfid2, oi, ost_idx2); + + CDEBUG(D_LFSCK, "Handle layout EA conflict: parent "DFID + ", cur-child "DFID" on the OST %u, orphan-child " + DFID" on the OST %u, stripe-index %u\n", + PFID(lfsck_dto2fid(parent)), PFID(cfid2), ost_idx2, + PFID(cfid), ltd->ltd_index, ea_off); + + /* Hold layout lock on the parent to prevent others to access. */ + rc = lfsck_layout_lock(env, com, parent, &lh, + MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR); + if (rc != 0) + GOTO(out, rc); + + rc = lfsck_layout_master_conditional_destroy(env, com, cfid2, ost_idx2); + + /* If the conflict OST-obejct is not created for fixing dangling + * referenced MDT-object in former LFSCK check/repair, or it has + * been modified by others, then we cannot destroy it. Re-create + * a new MDT-object for the orphan OST-object. */ + if (rc == -ETXTBSY) { + /* No need the layout lock on the original parent. */ + lfsck_layout_unlock(&lh); + ea_buf->lb_len = ori_len; + + fid_zero(&rec->lor_fid); + snprintf(postfix, LFSCK_TMPBUF_LEN, "-"DFID"-%x", + PFID(lu_object_fid(&parent->do_lu)), ea_off); + rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid, + "C-", postfix, ea_off); + + RETURN(rc); + } + + if (rc != 0 && rc != -ENOENT) + GOTO(unlock, rc); + + th = dt_trans_create(env, dev); + if (IS_ERR(th)) + GOTO(unlock, rc = PTR_ERR(th)); + + rc = dt_declare_xattr_set(env, parent, ea_buf, XATTR_NAME_LOV, + LU_XATTR_REPLACE, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, dev, th); + if (rc != 0) + GOTO(stop, rc); + + dt_write_lock(env, parent, 0); + lmm->lmm_layout_gen = cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1); + rc = lfsck_layout_refill_lovea(env, th, parent, cfid, ea_buf, slot, + LU_XATTR_REPLACE, ltd->ltd_index); + dt_write_unlock(env, parent); + + GOTO(stop, rc); + +stop: + dt_trans_stop(env, dev, th); + +unlock: + lfsck_layout_unlock(&lh); + +out: + ea_buf->lb_len = ori_len; + + return rc >= 0 ? 1 : rc; +} + +/** + * \retval +1: repaired + * \retval 0: did nothing + * \retval -ve: on error + */ +static int lfsck_layout_recreate_lovea(const struct lu_env *env, + struct lfsck_component *com, + struct lfsck_tgt_desc *ltd, + struct lu_orphan_rec *rec, + struct dt_object *parent, + struct lu_fid *cfid, + __u32 ost_idx, __u32 ea_off) +{ + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lu_buf *buf = &info->lti_big_buf; + struct lu_fid *fid = &info->lti_fid2; + struct ost_id *oi = &info->lti_oi; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct dt_device *dt = lfsck->li_bottom; + struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; + struct thandle *handle = NULL; + size_t buflen = buf->lb_len; + struct lov_mds_md_v1 *lmm; + struct lov_ost_data_v1 *objs; + struct lustre_handle lh = { 0 }; + __u32 magic; + int fl = 0; + int rc = 0; + int rc1; + int i; + __u16 count; + bool locked = false; + ENTRY; + + CDEBUG(D_LFSCK, "Re-create the crashed layout EA: parent " + DFID", child "DFID", OST-index %u, stripe-index %u\n", + PFID(lfsck_dto2fid(parent)), PFID(cfid), ost_idx, ea_off); + + rc = lfsck_layout_lock(env, com, parent, &lh, + MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR); + if (rc != 0) + RETURN(rc); + +again: + if (locked) { + dt_write_unlock(env, parent); + locked = false; + } + + if (handle != NULL) { + dt_trans_stop(env, dt, handle); + handle = NULL; + } + + if (rc < 0) + GOTO(unlock_layout, rc); + + if (buf->lb_len < rc) { + lu_buf_realloc(buf, rc); + buflen = buf->lb_len; + if (buf->lb_buf == NULL) + GOTO(unlock_layout, rc = -ENOMEM); + } + + if (!(bk->lb_param & LPF_DRYRUN)) { + handle = dt_trans_create(env, dt); + if (IS_ERR(handle)) + GOTO(unlock_layout, rc = PTR_ERR(handle)); + + rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV, + fl, handle); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, dt, handle); + if (rc != 0) + GOTO(stop, rc); + } + + dt_write_lock(env, parent, 0); + locked = true; + rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA); + if (rc == -ERANGE) { + rc = dt_xattr_get(env, parent, &LU_BUF_NULL, XATTR_NAME_LOV, + BYPASS_CAPA); + LASSERT(rc != 0); + goto again; + } else if (rc == -ENODATA || rc == 0) { + rc = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1); + /* If the declared is not big enough, re-try. */ + if (buf->lb_len < rc) + goto again; + + fl = LU_XATTR_CREATE; + } else if (rc < 0) { + GOTO(unlock_parent, rc); + } else if (unlikely(buf->lb_len == 0)) { + goto again; + } else { + fl = LU_XATTR_REPLACE; + } + + if (fl == LU_XATTR_CREATE) { + if (bk->lb_param & LPF_DRYRUN) + GOTO(unlock_parent, rc = 1); + + LASSERT(buf->lb_len >= rc); + + buf->lb_len = rc; + rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid, + buf, fl, ost_idx, ea_off, false); + + GOTO(unlock_parent, rc); + } + + lmm = buf->lb_buf; + rc1 = lfsck_layout_verify_header(lmm); + + /* If the LOV EA crashed, the rebuild it. */ + if (rc1 == -EINVAL) { + if (bk->lb_param & LPF_DRYRUN) + GOTO(unlock_parent, rc = 1); + + LASSERT(buf->lb_len >= rc); + + buf->lb_len = rc; + memset(lmm, 0, buf->lb_len); + rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid, + buf, fl, ost_idx, ea_off, true); + + GOTO(unlock_parent, rc); + } + + /* For other unknown magic/pattern, keep the current LOV EA. */ + if (rc1 != 0) + GOTO(unlock_parent, rc = rc1); + + /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has + * been verified in lfsck_layout_verify_header() already. If some + * new magic introduced in the future, then layout LFSCK needs to + * be updated also. */ + magic = le32_to_cpu(lmm->lmm_magic); + if (magic == LOV_MAGIC_V1) { + objs = &lmm->lmm_objects[0]; + } else { + LASSERT(magic == LOV_MAGIC_V3); + objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0]; + } + + count = le16_to_cpu(lmm->lmm_stripe_count); + if (count == 0) + GOTO(unlock_parent, rc = -EINVAL); + LASSERT(count > 0); + + /* Exceed the current end of MDT-object layout EA. Then extend it. */ + if (count <= ea_off) { + if (bk->lb_param & LPF_DRYRUN) + GOTO(unlock_parent, rc = 1); + + rc = lov_mds_md_size(ea_off + 1, magic); + /* If the declared is not big enough, re-try. */ + if (buf->lb_len < rc) + goto again; + + buf->lb_len = rc; + rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid, + buf, fl, ost_idx, ea_off, false); + + GOTO(unlock_parent, rc); + } + + LASSERTF(rc > 0, "invalid rc = %d\n", rc); + + buf->lb_len = rc; + for (i = 0; i < count; i++, objs++) { + /* The MDT-object was created via lfsck_layout_recover_create() + * by others before, and we fill the dummy layout EA. */ + if (lovea_slot_is_dummy(objs)) { + if (i != ea_off) + continue; + + if (bk->lb_param & LPF_DRYRUN) + GOTO(unlock_parent, rc = 1); + + lmm->lmm_layout_gen = + cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1); + rc = lfsck_layout_refill_lovea(env, handle, parent, + cfid, buf, objs, fl, + ost_idx); + GOTO(unlock_parent, rc); + } + + ostid_le_to_cpu(&objs->l_ost_oi, oi); + ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx)); + /* It should be rare case, the slot is there, but the LFSCK + * does not handle it during the first-phase cycle scanning. */ + if (unlikely(lu_fid_eq(fid, cfid))) { + if (i == ea_off) { + GOTO(unlock_parent, rc = 0); + } else { + /* Rare case that the OST-object index + * does not match the parent MDT-object + * layout EA. We trust the later one. */ + if (bk->lb_param & LPF_DRYRUN) + GOTO(unlock_parent, rc = 1); + + dt_write_unlock(env, parent); + if (handle != NULL) + dt_trans_stop(env, dt, handle); + lfsck_layout_unlock(&lh); + buf->lb_len = buflen; + rc = lfsck_layout_update_pfid(env, com, parent, + cfid, ltd->ltd_tgt, i); + + RETURN(rc); + } + } + } + + /* The MDT-object exists, but related layout EA slot is occupied + * by others. */ + if (bk->lb_param & LPF_DRYRUN) + GOTO(unlock_parent, rc = 1); + + dt_write_unlock(env, parent); + if (handle != NULL) + dt_trans_stop(env, dt, handle); + lfsck_layout_unlock(&lh); + if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1) + objs = &lmm->lmm_objects[ea_off]; + else + objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[ea_off]; + rc = lfsck_layout_conflict_create(env, com, ltd, rec, parent, cfid, + buf, objs, ea_off, buflen); + + RETURN(rc); + +unlock_parent: + if (locked) + dt_write_unlock(env, parent); + +stop: + if (handle != NULL) + dt_trans_stop(env, dt, handle); + +unlock_layout: + lfsck_layout_unlock(&lh); + buf->lb_len = buflen; + + return rc; +} + +static int lfsck_layout_scan_orphan_one(const struct lu_env *env, + struct lfsck_component *com, + struct lfsck_tgt_desc *ltd, + struct lu_orphan_rec *rec, + struct lu_fid *cfid) +{ + struct lfsck_layout *lo = com->lc_file_ram; + struct lu_fid *pfid = &rec->lor_fid; + struct dt_object *parent = NULL; + __u32 ea_off = pfid->f_stripe_idx; + int rc = 0; + ENTRY; + + if (!fid_is_sane(cfid)) + GOTO(out, rc = -EINVAL); + + if (fid_is_zero(pfid)) { + rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid, + "N-", "", ea_off); + GOTO(out, rc); + } + + pfid->f_ver = 0; + if (!fid_is_sane(pfid)) + GOTO(out, rc = -EINVAL); + + parent = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid); + if (IS_ERR(parent)) + GOTO(out, rc = PTR_ERR(parent)); + + if (unlikely(dt_object_remote(parent) != 0)) + GOTO(put, rc = -EXDEV); + + if (dt_object_exists(parent) == 0) { + lu_object_put(env, &parent->do_lu); + rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid, + "R-", "", ea_off); + GOTO(out, rc); + } + + if (!S_ISREG(lu_object_attr(&parent->do_lu))) + GOTO(put, rc = -EISDIR); - return rc; -} + rc = lfsck_layout_recreate_lovea(env, com, ltd, rec, parent, cfid, + ltd->ltd_index, ea_off); -static int lfsck_layout_scan_orphan_one(const struct lu_env *env, - struct lfsck_component *com, - struct lfsck_tgt_desc *ltd, - struct lu_orphan_rec *rec, - struct lu_fid *cfid) -{ - struct lfsck_layout *lo = com->lc_file_ram; - int rc = 0; + GOTO(put, rc); - /* XXX: To be extended in other patch. */ +put: + if (rc <= 0) + lu_object_put(env, &parent->do_lu); + else + /* The layout EA is changed, need to be reloaded next time. */ + lu_object_put_nocache(env, &parent->do_lu); +out: down_write(&com->lc_sem); com->lc_new_scanned++; com->lc_new_checked++; @@ -1738,6 +2758,18 @@ static int lfsck_layout_scan_orphan(const struct lu_env *env, struct dt_key *key; struct lu_orphan_rec *rec = &info->lti_rec; + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) && + cfs_fail_val > 0) { + struct ptlrpc_thread *thread = &lfsck->li_thread; + struct l_wait_info lwi; + + lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val), + NULL, NULL); + l_wait_event(thread->t_ctl_waitq, + !thread_is_running(thread), + &lwi); + } + key = iops->key(env, di); com->lc_fid_latest_scanned_phase2 = *(struct lu_fid *)key; rc = iops->rec(env, di, (struct dt_rec *)rec, 0); @@ -1767,16 +2799,24 @@ put: return rc > 0 ? 0 : rc; } -/* For the MDT-object with dangling reference, we need to re-create - * the missed OST-object with the known FID/owner information. */ -static int lfsck_layout_recreate_ostobj(const struct lu_env *env, +/* For the MDT-object with dangling reference, we need to repare the + * inconsistency according to the LFSCK sponsor's requirement: + * + * 1) Keep the inconsistency there and report the inconsistency case, + * then give the chance to the application to find related issues, + * and the users can make the decision about how to handle it with + * more human knownledge. (by default) + * + * 2) Re-create the missed OST-object with the FID/owner information. */ +static int lfsck_layout_repair_dangling(const struct lu_env *env, struct lfsck_component *com, struct lfsck_layout_req *llr, - struct lu_attr *la) + const struct lu_attr *pla) { struct lfsck_thread_info *info = lfsck_env_info(env); struct filter_fid *pfid = &info->lti_new_pfid; struct dt_allocation_hint *hint = &info->lti_hint; + struct lu_attr *cla = &info->lti_la2; struct dt_object *parent = llr->llr_parent->llo_obj; struct dt_object *child = llr->llr_child; struct dt_device *dev = lfsck_obj2dt_dev(child); @@ -1785,12 +2825,30 @@ static int lfsck_layout_recreate_ostobj(const struct lu_env *env, struct lu_buf *buf; struct lustre_handle lh = { 0 }; int rc; + bool create; ENTRY; - CDEBUG(D_LFSCK, "Repair dangling reference for: parent "DFID - ", child "DFID", OST-index %u, stripe-index %u, owner %u:%u\n", + if (com->lc_lfsck->li_bookmark_ram.lb_param & LPF_CREATE_OSTOBJ) + create = true; + else + create = false; + + CDEBUG(D_LFSCK, "Found dangling reference for: parent "DFID + ", child "DFID", OST-index %u, stripe-index %u, owner %u:%u. %s", PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)), - llr->llr_ost_idx, llr->llr_lov_idx, la->la_uid, la->la_gid); + llr->llr_ost_idx, llr->llr_lov_idx, pla->la_uid, pla->la_gid, + create ? "Create the lost OST-object as required.\n" : + "Keep the MDT-object there by default.\n"); + + if (!create) + RETURN(1); + + memset(cla, 0, sizeof(*cla)); + cla->la_uid = pla->la_uid; + cla->la_gid = pla->la_gid; + cla->la_mode = S_IFREG | 0666; + cla->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID | + LA_ATIME | LA_MTIME | LA_CTIME; rc = lfsck_layout_lock(env, com, parent, &lh, MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR); @@ -1805,10 +2863,13 @@ static int lfsck_layout_recreate_ostobj(const struct lu_env *env, hint->dah_mode = 0; pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq); pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid); - pfid->ff_parent.f_ver = cpu_to_le32(llr->llr_lov_idx); + /* Currently, the filter_fid::ff_parent::f_ver is not the real parent + * MDT-object's FID::f_ver, instead it is the OST-object index in its + * parent MDT-object's layout EA. */ + pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx); buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid)); - rc = dt_declare_create(env, child, la, hint, NULL, handle); + rc = dt_declare_create(env, child, cla, hint, NULL, handle); if (rc != 0) GOTO(stop, rc); @@ -1825,7 +2886,7 @@ static int lfsck_layout_recreate_ostobj(const struct lu_env *env, if (unlikely(lu_object_is_dying(parent->do_lu.lo_header))) GOTO(unlock2, rc = 1); - rc = dt_create(env, child, la, hint, NULL, handle); + rc = dt_create(env, child, cla, hint, NULL, handle); if (rc != 0) GOTO(unlock2, rc); @@ -1883,9 +2944,10 @@ static int lfsck_layout_repair_unmatched_pair(const struct lu_env *env, pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq); pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid); - /* The ff_parent->f_ver is not the real parent fid->f_ver. Instead, - * it is the OST-object index in the parent MDT-object layout. */ - pfid->ff_parent.f_ver = cpu_to_le32(llr->llr_lov_idx); + /* Currently, the filter_fid::ff_parent::f_ver is not the real parent + * MDT-object's FID::f_ver, instead it is the OST-object index in its + * parent MDT-object's layout EA. */ + pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx); buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid)); rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle); @@ -2011,10 +3073,6 @@ static int lfsck_layout_repair_multiple_references(const struct lu_env *env, GOTO(unlock2, rc = 0); lmm = buf->lb_buf; - rc = lfsck_layout_verify_header(lmm); - if (rc != 0) - GOTO(unlock2, rc); - /* Someone change layout during the LFSCK, no need to repair then. */ if (le16_to_cpu(lmm->lmm_layout_gen) != llr->llr_parent->llo_gen) GOTO(unlock2, rc = 0); @@ -2029,7 +3087,7 @@ static int lfsck_layout_repair_multiple_references(const struct lu_env *env, * be updated also. */ magic = le32_to_cpu(lmm->lmm_magic); if (magic == LOV_MAGIC_V1) { - objs = &(lmm->lmm_objects[0]); + objs = &lmm->lmm_objects[0]; } else { LASSERT(magic == LOV_MAGIC_V3); objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0]; @@ -2201,17 +3259,9 @@ static int lfsck_layout_check_parent(const struct lu_env *env, GOTO(out, rc); lmm = buf->lb_buf; - rc = lfsck_layout_verify_header(lmm); - if (rc != 0) - GOTO(out, rc); - - /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has - * been verified in lfsck_layout_verify_header() already. If some - * new magic introduced in the future, then layout LFSCK needs to - * be updated also. */ magic = le32_to_cpu(lmm->lmm_magic); if (magic == LOV_MAGIC_V1) { - objs = &(lmm->lmm_objects[0]); + objs = &lmm->lmm_objects[0]; } else { LASSERT(magic == LOV_MAGIC_V3); objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0]; @@ -2222,6 +3272,9 @@ static int lfsck_layout_check_parent(const struct lu_env *env, struct lu_fid *tfid = &info->lti_fid2; struct ost_id *oi = &info->lti_oi; + if (lovea_slot_is_dummy(objs)) + continue; + ostid_le_to_cpu(&objs->l_ost_oi, oi); ostid_to_fid(tfid, oi, le32_to_cpu(objs->l_ost_idx)); if (lu_fid_eq(cfid, tfid)) { @@ -2294,10 +3347,10 @@ static int lfsck_layout_assistant_handle_one(const struct lu_env *env, fid_zero(pfid); } else { fid_le_to_cpu(pfid, &pea->ff_parent); - /* OST-object does not save parent FID::f_ver, instead, - * the OST-object index in the parent MDT-object layout - * EA reuses the pfid->f_ver. */ - idx = pfid->f_ver; + /* Currently, the filter_fid::ff_parent::f_ver is not the + * real parent MDT-object's FID::f_ver, instead it is the + * OST-object index in its parent MDT-object's layout EA. */ + idx = pfid->f_stripe_idx; pfid->f_ver = 0; } @@ -2328,13 +3381,7 @@ repair: switch (type) { case LLIT_DANGLING: - memset(cla, 0, sizeof(*cla)); - cla->la_uid = pla->la_uid; - cla->la_gid = pla->la_gid; - cla->la_mode = S_IFREG | 0666; - cla->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID | - LA_ATIME | LA_MTIME | LA_CTIME; - rc = lfsck_layout_recreate_ostobj(env, com, llr, cla); + rc = lfsck_layout_repair_dangling(env, com, llr, pla); break; case LLIT_UNMATCHED_PAIR: rc = lfsck_layout_repair_unmatched_pair(env, com, llr, pla); @@ -2356,23 +3403,33 @@ repair: out: down_write(&com->lc_sem); if (rc < 0) { - /* If cannot touch the target server, - * mark the LFSCK as INCOMPLETE. */ - if (rc == -ENOTCONN || rc == -ESHUTDOWN || rc == -ETIMEDOUT || - rc == -EHOSTDOWN || rc == -EHOSTUNREACH) { + struct lfsck_layout_master_data *llmd = com->lc_data; + + if (unlikely(llmd->llmd_exit)) { + rc = 0; + } else if (rc == -ENOTCONN || rc == -ESHUTDOWN || + rc == -ETIMEDOUT || rc == -EHOSTDOWN || + rc == -EHOSTUNREACH) { + /* If cannot touch the target server, + * mark the LFSCK as INCOMPLETE. */ CERROR("%s: Fail to talk with OST %x: rc = %d.\n", lfsck_lfsck2name(lfsck), llr->llr_ost_idx, rc); lo->ll_flags |= LF_INCOMPLETE; lo->ll_objs_skipped++; rc = 0; } else { - lo->ll_objs_failed_phase1++; + lfsck_layout_record_failure(env, lfsck, lo); } } else if (rc > 0) { LASSERTF(type > LLIT_NONE && type <= LLIT_MAX, "unknown type = %d\n", type); lo->ll_objs_repaired[type - 1]++; + if (bk->lb_param & LPF_DRYRUN && + unlikely(lo->ll_pos_first_inconsistent == 0)) + lo->ll_pos_first_inconsistent = + lfsck->li_obj_oit->do_index_ops->dio_it.store(env, + lfsck->li_di_oit); } up_write(&com->lc_sem); @@ -2401,7 +3458,7 @@ static int lfsck_layout_assistant(void *args) memset(lr, 0, sizeof(*lr)); lr->lr_event = LE_START; lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN | - LSV_ASYNC_WINDOWS; + LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ; lr->lr_speed = bk->lb_speed_limit; lr->lr_version = bk->lb_version; lr->lr_param = bk->lb_param; @@ -2426,7 +3483,8 @@ static int lfsck_layout_assistant(void *args) while (!list_empty(&llmd->llmd_req_list)) { bool wakeup = false; - if (unlikely(llmd->llmd_exit)) + if (unlikely(llmd->llmd_exit || + !thread_is_running(mthread))) GOTO(cleanup1, rc = llmd->llmd_post_result); llr = list_entry(llmd->llmd_req_list.next, @@ -2440,11 +3498,15 @@ static int lfsck_layout_assistant(void *args) rc = lfsck_layout_assistant_handle_one(env, com, llr); spin_lock(&llmd->llmd_lock); list_del_init(&llr->llr_list); - if (bk->lb_async_windows != 0 && - llmd->llmd_prefetched >= bk->lb_async_windows) - wakeup = true; - llmd->llmd_prefetched--; + /* Wake up the main engine thread only when the list + * is empty or half of the prefetched items have been + * handled to avoid too frequent thread schedule. */ + if (llmd->llmd_prefetched == 0 || + (bk->lb_async_windows != 0 && + bk->lb_async_windows / 2 == + llmd->llmd_prefetched)) + wakeup = true; spin_unlock(&llmd->llmd_lock); if (wakeup) wake_up_all(&mthread->t_ctl_waitq); @@ -2454,9 +3516,6 @@ static int lfsck_layout_assistant(void *args) GOTO(cleanup1, rc); } - /* Wakeup the master engine if it is waiting in checkpoint. */ - wake_up_all(&mthread->t_ctl_waitq); - l_wait_event(athread->t_ctl_waitq, !lfsck_layout_req_empty(llmd) || llmd->llmd_exit || @@ -2500,6 +3559,9 @@ static int lfsck_layout_assistant(void *args) com->lc_time_last_checkpoint + cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL); + /* flush all async updating before handling orphan. */ + dt_sync(env, lfsck->li_next); + while (llmd->llmd_in_double_scan) { struct lfsck_tgt_descs *ltds = &lfsck->li_ost_descs; @@ -2638,7 +3700,7 @@ cleanup2: /* Under force exit case, some requests may be just freed without * verification, those objects should be re-handled when next run. * So not update the on-disk tracing file under such case. */ - if (!llmd->llmd_exit) + if (llmd->llmd_in_double_scan && !llmd->llmd_exit) rc1 = lfsck_layout_double_scan_result(env, com, rc); fini: @@ -2887,6 +3949,209 @@ lfsck_layout_slave_notify_master(const struct lu_env *env, RETURN_EXIT; } +/* + * \ret -ENODATA: unrecognized stripe + * \ret = 0 : recognized stripe + * \ret < 0 : other failures + */ +static int lfsck_layout_master_check_pairs(const struct lu_env *env, + struct lfsck_component *com, + struct lu_fid *cfid, + struct lu_fid *pfid) +{ + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lu_buf *buf = &info->lti_big_buf; + struct ost_id *oi = &info->lti_oi; + struct dt_object *obj; + struct lov_mds_md_v1 *lmm; + struct lov_ost_data_v1 *objs; + __u32 idx = pfid->f_stripe_idx; + __u32 magic; + int rc = 0; + int i; + __u16 count; + ENTRY; + + pfid->f_ver = 0; + obj = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid); + if (IS_ERR(obj)) + RETURN(PTR_ERR(obj)); + + dt_read_lock(env, obj, 0); + if (unlikely(!dt_object_exists(obj))) + GOTO(unlock, rc = -ENOENT); + + rc = lfsck_layout_get_lovea(env, obj, buf, NULL); + if (rc < 0) + GOTO(unlock, rc); + + if (rc == 0) + GOTO(unlock, rc = -ENODATA); + + lmm = buf->lb_buf; + rc = lfsck_layout_verify_header(lmm); + if (rc != 0) + GOTO(unlock, rc); + + /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has + * been verified in lfsck_layout_verify_header() already. If some + * new magic introduced in the future, then layout LFSCK needs to + * be updated also. */ + magic = le32_to_cpu(lmm->lmm_magic); + if (magic == LOV_MAGIC_V1) { + objs = &lmm->lmm_objects[0]; + } else { + LASSERT(magic == LOV_MAGIC_V3); + objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0]; + } + + fid_to_ostid(cfid, oi); + count = le16_to_cpu(lmm->lmm_stripe_count); + for (i = 0; i < count; i++, objs++) { + struct ost_id oi2; + + ostid_le_to_cpu(&objs->l_ost_oi, &oi2); + if (memcmp(oi, &oi2, sizeof(*oi)) == 0) + GOTO(unlock, rc = (i != idx ? -ENODATA : 0)); + } + + GOTO(unlock, rc = -ENODATA); + +unlock: + dt_read_unlock(env, obj); + lu_object_put(env, &obj->do_lu); + + return rc; +} + +/* + * The LFSCK-on-OST will ask the LFSCK-on-MDT to check whether the given + * MDT-object/OST-object pairs match or not to aviod transfer MDT-object + * layout EA from MDT to OST. On one hand, the OST no need to understand + * the layout EA structure; on the other hand, it may cause trouble when + * transfer large layout EA from MDT to OST via normal OUT RPC. + * + * \ret > 0: unrecognized stripe + * \ret = 0: recognized stripe + * \ret < 0: other failures + */ +static int lfsck_layout_slave_check_pairs(const struct lu_env *env, + struct lfsck_component *com, + struct lu_fid *cfid, + struct lu_fid *pfid) +{ + struct lfsck_instance *lfsck = com->lc_lfsck; + struct obd_device *obd = lfsck->li_obd; + struct seq_server_site *ss = + lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site); + struct obd_export *exp = NULL; + struct ptlrpc_request *req = NULL; + struct lfsck_request *lr; + struct lu_seq_range range = { 0 }; + int rc = 0; + ENTRY; + + if (unlikely(fid_is_idif(pfid))) + RETURN(1); + + fld_range_set_any(&range); + rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(pfid), &range); + if (rc != 0) + RETURN(rc == -ENOENT ? 1 : rc); + + if (unlikely(!fld_range_is_mdt(&range))) + RETURN(1); + + exp = lustre_find_lwp_by_index(obd->obd_name, range.lsr_index); + if (unlikely(exp == NULL)) + RETURN(1); + + if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK)) + GOTO(out, rc = -EOPNOTSUPP); + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY); + if (req == NULL) + GOTO(out, rc = -ENOMEM); + + rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY); + if (rc != 0) { + ptlrpc_request_free(req); + + GOTO(out, rc); + } + + lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST); + memset(lr, 0, sizeof(*lr)); + lr->lr_event = LE_PAIRS_VERIFY; + lr->lr_active = LT_LAYOUT; + lr->lr_fid = *cfid; /* OST-object itself FID. */ + lr->lr_fid2 = *pfid; /* The claimed parent FID. */ + + ptlrpc_request_set_replen(req); + rc = ptlrpc_queue_wait(req); + ptlrpc_req_finished(req); + + if (rc == -ENOENT || rc == -ENODATA) + rc = 1; + + GOTO(out, rc); + +out: + if (exp != NULL) + class_export_put(exp); + + return rc; +} + +static int lfsck_layout_slave_repair_pfid(const struct lu_env *env, + struct lfsck_component *com, + struct lfsck_request *lr) +{ + struct lfsck_thread_info *info = lfsck_env_info(env); + struct filter_fid *ff = &info->lti_new_pfid; + struct lu_buf *buf; + struct dt_device *dev = com->lc_lfsck->li_bottom; + struct dt_object *obj; + struct thandle *th = NULL; + int rc = 0; + ENTRY; + + obj = lfsck_object_find_by_dev(env, dev, &lr->lr_fid); + if (IS_ERR(obj)) + RETURN(PTR_ERR(obj)); + + fid_cpu_to_le(&ff->ff_parent, &lr->lr_fid2); + buf = lfsck_buf_get(env, ff, sizeof(*ff)); + dt_write_lock(env, obj, 0); + if (unlikely(!dt_object_exists(obj))) + GOTO(unlock, rc = 0); + + th = dt_trans_create(env, dev); + if (IS_ERR(th)) + GOTO(unlock, rc = PTR_ERR(th)); + + rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, dev, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th, BYPASS_CAPA); + + GOTO(stop, rc); + +stop: + dt_trans_stop(env, dev, th); + +unlock: + dt_write_unlock(env, obj); + lu_object_put(env, &obj->do_lu); + + return rc; +} + /* layout APIs */ static int lfsck_layout_reset(const struct lu_env *env, @@ -2924,14 +4189,7 @@ static void lfsck_layout_fail(const struct lu_env *env, down_write(&com->lc_sem); if (new_checked) com->lc_new_checked++; - lo->ll_objs_failed_phase1++; - if (lo->ll_pos_first_inconsistent == 0) { - struct lfsck_instance *lfsck = com->lc_lfsck; - - lo->ll_pos_first_inconsistent = - lfsck->li_obj_oit->do_index_ops->dio_it.store(env, - lfsck->li_di_oit); - } + lfsck_layout_record_failure(env, com->lc_lfsck, lo); up_write(&com->lc_sem); } @@ -3025,6 +4283,9 @@ static int lfsck_layout_prep(const struct lu_env *env, int rc; rc = lfsck_layout_reset(env, com, false); + if (rc == 0) + rc = lfsck_set_param(env, lfsck, start, true); + if (rc != 0) return rc; } @@ -3075,13 +4336,26 @@ static int lfsck_layout_slave_prep(const struct lu_env *env, struct lfsck_start_param *lsp) { struct lfsck_layout_slave_data *llsd = com->lc_data; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_layout *lo = com->lc_file_ram; struct lfsck_start *start = lsp->lsp_start; int rc; rc = lfsck_layout_prep(env, com, start); - if (rc != 0 || !lsp->lsp_index_valid) + if (rc != 0) return rc; + if (lo->ll_flags & LF_CRASHED_LASTID && + list_empty(&llsd->llsd_master_list)) { + LASSERT(lfsck->li_out_notify != NULL); + + lfsck->li_out_notify(env, lfsck->li_out_notify_data, + LE_LASTID_REBUILDING); + } + + if (!lsp->lsp_index_valid) + return 0; + rc = lfsck_layout_llst_add(llsd, lsp->lsp_index); if (rc == 0 && start != NULL && start->ls_flags & LPF_ORPHAN) { LASSERT(!llsd->llsd_rbtree_valid); @@ -3103,7 +4377,8 @@ static int lfsck_layout_master_prep(const struct lu_env *env, struct ptlrpc_thread *mthread = &lfsck->li_thread; struct ptlrpc_thread *athread = &llmd->llmd_thread; struct lfsck_thread_args *lta; - long rc; + struct task_struct *task; + int rc; ENTRY; rc = lfsck_layout_prep(env, com, lsp->lsp_start); @@ -3122,10 +4397,11 @@ static int lfsck_layout_master_prep(const struct lu_env *env, if (IS_ERR(lta)) RETURN(PTR_ERR(lta)); - rc = PTR_ERR(kthread_run(lfsck_layout_assistant, lta, "lfsck_layout")); - if (IS_ERR_VALUE(rc)) { + task = kthread_run(lfsck_layout_assistant, lta, "lfsck_layout"); + if (IS_ERR(task)) { + rc = PTR_ERR(task); CERROR("%s: Cannot start LFSCK layout assistant thread: " - "rc = %ld\n", lfsck_lfsck2name(lfsck), rc); + "rc = %d\n", lfsck_lfsck2name(lfsck), rc); lfsck_thread_args_fini(lta); } else { struct l_wait_info lwi = { 0 }; @@ -3178,7 +4454,7 @@ static int lfsck_layout_scan_stripes(const struct lu_env *env, * be updated also. */ magic = le32_to_cpu(lmm->lmm_magic); if (magic == LOV_MAGIC_V1) { - objs = &(lmm->lmm_objects[0]); + objs = &lmm->lmm_objects[0]; } else { LASSERT(magic == LOV_MAGIC_V3); objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0]; @@ -3194,6 +4470,9 @@ static int lfsck_layout_scan_stripes(const struct lu_env *env, le32_to_cpu(objs->l_ost_idx); bool wakeup = false; + if (unlikely(lovea_slot_is_dummy(objs))) + continue; + l_wait_event(mthread->t_ctl_waitq, bk->lb_async_windows == 0 || llmd->llmd_prefetched < bk->lb_async_windows || @@ -3267,7 +4546,7 @@ next: down_write(&com->lc_sem); com->lc_new_checked++; if (rc < 0) - lo->ll_objs_failed_phase1++; + lfsck_layout_record_failure(env, lfsck, lo); up_write(&com->lc_sem); if (cobj != NULL && !IS_ERR(cobj)) @@ -3337,6 +4616,8 @@ again: buf->lb_len = rc; lmm = buf->lb_buf; rc = lfsck_layout_verify_header(lmm); + /* If the LOV EA crashed, then it is possible to be rebuilt later + * when handle orphan OST-objects. */ if (rc != 0) GOTO(out, rc); @@ -3414,7 +4695,7 @@ out: down_write(&com->lc_sem); com->lc_new_checked++; if (rc < 0) - lo->ll_objs_failed_phase1++; + lfsck_layout_record_failure(env, lfsck, lo); up_write(&com->lc_sem); } buf->lb_len = buflen; @@ -3438,6 +4719,17 @@ static int lfsck_layout_slave_exec_oit(const struct lu_env *env, LASSERT(llsd != NULL); + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY5) && + cfs_fail_val == lfsck_dev_idx(lfsck->li_bottom)) { + struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(1), + NULL, NULL); + struct ptlrpc_thread *thread = &lfsck->li_thread; + + l_wait_event(thread->t_ctl_waitq, + !thread_is_running(thread), + &lwi); + } + lfsck_rbtree_update_bitmap(env, com, fid, false); down_write(&com->lc_sem); @@ -3750,7 +5042,8 @@ static int lfsck_layout_dump(const struct lu_env *env, const struct dt_it_ops *iops; cfs_duration_t duration = cfs_time_current() - lfsck->li_time_last_checkpoint; - __u64 checked = lo->ll_objs_checked_phase1 + com->lc_new_checked; + __u64 checked = lo->ll_objs_checked_phase1 + + com->lc_new_checked; __u64 speed = checked; __u64 new_checked = com->lc_new_checked * HZ; __u32 rtime = lo->ll_run_time_phase1 + @@ -3801,31 +5094,36 @@ static int lfsck_layout_dump(const struct lu_env *env, } else if (lo->ll_status == LS_SCANNING_PHASE2) { cfs_duration_t duration = cfs_time_current() - lfsck->li_time_last_checkpoint; - __u64 checked = lo->ll_objs_checked_phase1 + com->lc_new_checked; - __u64 speed = checked; + __u64 checked = lo->ll_objs_checked_phase2 + + com->lc_new_checked; + __u64 speed1 = lo->ll_objs_checked_phase1; + __u64 speed2 = checked; __u64 new_checked = com->lc_new_checked * HZ; - __u32 rtime = lo->ll_run_time_phase1 + + __u32 rtime = lo->ll_run_time_phase2 + cfs_duration_sec(duration + HALF_SEC); if (duration != 0) do_div(new_checked, duration); + if (lo->ll_run_time_phase1 != 0) + do_div(speed1, lo->ll_run_time_phase1); if (rtime != 0) - do_div(speed, rtime); + do_div(speed2, rtime); rc = snprintf(buf, len, "checked_phase1: "LPU64"\n" "checked_phase2: "LPU64"\n" "run_time_phase1: %u seconds\n" "run_time_phase2: %u seconds\n" "average_speed_phase1: "LPU64" items/sec\n" - "average_speed_phase2: N/A\n" - "real-time_speed_phase1: "LPU64" items/sec\n" - "real-time_speed_phase2: N/A\n" + "average_speed_phase2: "LPU64" items/sec\n" + "real-time_speed_phase1: N/A\n" + "real-time_speed_phase2: "LPU64" items/sec\n" "current_position: "DFID"\n", + lo->ll_objs_checked_phase1, checked, - lo->ll_objs_checked_phase2, + lo->ll_run_time_phase1, rtime, - lo->ll_run_time_phase2, - speed, + speed1, + speed2, new_checked, PFID(&com->lc_fid_latest_scanned_phase2)); if (rc <= 0) @@ -4070,6 +5368,15 @@ static int lfsck_layout_master_in_notify(const struct lu_env *env, bool fail = false; ENTRY; + if (lr->lr_event == LE_PAIRS_VERIFY) { + int rc; + + rc = lfsck_layout_master_check_pairs(env, com, &lr->lr_fid, + &lr->lr_fid2); + + RETURN(rc); + } + if (lr->lr_event != LE_PHASE1_DONE && lr->lr_event != LE_PHASE2_DONE && lr->lr_event != LE_PEER_EXIT) @@ -4084,7 +5391,7 @@ static int lfsck_layout_master_in_notify(const struct lu_env *env, if (ltd == NULL) { spin_unlock(<ds->ltd_lock); - RETURN(-ENODEV); + RETURN(-ENXIO); } list_del_init(<d->ltd_layout_phase_list); @@ -4158,20 +5465,59 @@ static int lfsck_layout_slave_in_notify(const struct lu_env *env, struct lfsck_instance *lfsck = com->lc_lfsck; struct lfsck_layout_slave_data *llsd = com->lc_data; struct lfsck_layout_slave_target *llst; + int rc; ENTRY; - if (lr->lr_event == LE_FID_ACCESSED) { + switch (lr->lr_event) { + case LE_FID_ACCESSED: lfsck_rbtree_update_bitmap(env, com, &lr->lr_fid, true); - RETURN(0); - } + case LE_CONDITIONAL_DESTROY: + rc = lfsck_layout_slave_conditional_destroy(env, com, lr); + RETURN(rc); + case LE_PAIRS_VERIFY: { + lr->lr_status = LPVS_INIT; + /* Firstly, if the MDT-object which is claimed via OST-object + * local stored PFID xattr recognizes the OST-object, then it + * must be that the client given PFID is wrong. */ + rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid, + &lr->lr_fid3); + if (rc <= 0) + RETURN(0); + + lr->lr_status = LPVS_INCONSISTENT; + /* The OST-object local stored PFID xattr is stale. We need to + * check whether the MDT-object that is claimed via the client + * given PFID information recognizes the OST-object or not. If + * matches, then need to update the OST-object's PFID xattr. */ + rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid, + &lr->lr_fid2); + /* For rc < 0 case: + * We are not sure whether the client given PFID information + * is correct or not, do nothing to avoid improper fixing. + * + * For rc > 0 case: + * The client given PFID information is also invalid, we can + * NOT fix the OST-object inconsistency. + */ + if (rc != 0) + RETURN(rc); - if (lr->lr_event != LE_PHASE2_DONE && lr->lr_event != LE_PEER_EXIT) + lr->lr_status = LPVS_INCONSISTENT_TOFIX; + rc = lfsck_layout_slave_repair_pfid(env, com, lr); + + RETURN(rc); + } + case LE_PHASE2_DONE: + case LE_PEER_EXIT: + break; + default: RETURN(-EINVAL); + } llst = lfsck_layout_llst_find_and_del(llsd, lr->lr_index, true); if (llst == NULL) - RETURN(-ENODEV); + RETURN(-ENXIO); lfsck_layout_llst_put(llst); if (list_empty(&llsd->llsd_master_list)) @@ -4603,7 +5949,7 @@ static struct dt_it *lfsck_orphan_it_init(const struct lu_env *env, lfsck = lfsck_instance_find(dev, true, false); if (unlikely(lfsck == NULL)) - RETURN(ERR_PTR(-ENODEV)); + RETURN(ERR_PTR(-ENXIO)); com = lfsck_component_find(lfsck, LT_LAYOUT); if (unlikely(com == NULL)) @@ -4619,10 +5965,10 @@ static struct dt_it *lfsck_orphan_it_init(const struct lu_env *env, it->loi_llst = lfsck_layout_llst_find_and_del(llsd, attr, false); if (it->loi_llst == NULL) - GOTO(out, rc = -ENODEV); + GOTO(out, rc = -ENXIO); if (dev->dd_record_fid_accessed) { - /* The first iteratino against the rbtree, scan the whole rbtree + /* The first iteration against the rbtree, scan the whole rbtree * to remove the nodes which do NOT need to be handled. */ write_lock(&llsd->llsd_rb_lock); if (dev->dd_record_fid_accessed) { @@ -4863,10 +6209,10 @@ again1: GOTO(out, rc = -EINVAL); fid_le_to_cpu(&rec->lor_fid, &pfid->ff_parent); - /* In fact, the ff_parent::f_ver is not the real parent FID::f_ver, - * instead, it is the OST-object index in its parent MDT-object - * layout EA. */ - save = rec->lor_fid.f_ver; + /* Currently, the filter_fid::ff_parent::f_ver is not the real parent + * MDT-object's FID::f_ver, instead it is the OST-object index in its + * parent MDT-object's layout EA. */ + save = rec->lor_fid.f_stripe_idx; rec->lor_fid.f_ver = 0; rc = lfsck_fid_match_idx(env, lfsck, &rec->lor_fid, idx); /* If the orphan OST-object does not claim the MDT, then next. @@ -4880,7 +6226,7 @@ again1: goto again1; } - rec->lor_fid.f_ver = save; + rec->lor_fid.f_stripe_idx = save; rec->lor_uid = la->la_uid; rec->lor_gid = la->la_gid;