Whamcloud - gitweb
LU-5519 lfsck: repair master LMV for striped directory
[fs/lustre-release.git] / lustre / lfsck / lfsck_striped_dir.c
index c6da886..7a4c09f 100644 (file)
 void lfsck_lmv_put(const struct lu_env *env, struct lfsck_lmv *llmv)
 {
        if (llmv != NULL && atomic_dec_and_test(&llmv->ll_ref)) {
-               if (llmv->ll_lslr != NULL)
+               if (llmv->ll_inline) {
+                       struct lfsck_lmv_unit   *llu;
+                       struct lfsck_instance   *lfsck;
+
+                       llu = list_entry(llmv, struct lfsck_lmv_unit, llu_lmv);
+                       lfsck = llu->llu_lfsck;
+
+                       spin_lock(&lfsck->li_lock);
+                       list_del(&llu->llu_link);
+                       spin_unlock(&lfsck->li_lock);
+
+                       lfsck_object_put(env, llu->llu_obj);
+
+                       LASSERT(llmv->ll_lslr != NULL);
+
                        OBD_FREE_LARGE(llmv->ll_lslr,
-                               sizeof(struct lfsck_slave_lmv_rec) *
-                               llmv->ll_stripes_allocated);
+                                      sizeof(*llmv->ll_lslr) *
+                                      llmv->ll_stripes_allocated);
+                       OBD_FREE_PTR(llu);
+               } else {
+                       if (llmv->ll_lslr != NULL)
+                               OBD_FREE_LARGE(llmv->ll_lslr,
+                                       sizeof(*llmv->ll_lslr) *
+                                       llmv->ll_stripes_allocated);
+
+                       OBD_FREE_PTR(llmv);
+               }
+       }
+}
+
+/**
+ * Mark the specified directory as read-only by set LUSTRE_IMMUTABLE_FL.
+ *
+ * The caller has taken the ldlm lock on the @obj already.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] obj      pointer to the object to be handled
+ * \param[in] del_lmv  true if need to drop the LMV EA
+ *
+ * \retval             positive number if nothing to be done
+ * \retval             zero for succeed
+ * \retval             negative error number on failure
+ */
+static int lfsck_disable_master_lmv(const struct lu_env *env,
+                                   struct lfsck_component *com,
+                                   struct dt_object *obj, bool del_lmv)
+{
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       struct lu_attr                  *la     = &info->lti_la;
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct dt_device                *dev    = lfsck_obj2dt_dev(obj);
+       struct thandle                  *th     = NULL;
+       int                              rc     = 0;
+       ENTRY;
+
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(log, rc = PTR_ERR(th));
+
+       if (del_lmv) {
+               rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LMV, th);
+               if (rc != 0)
+                       GOTO(stop, rc);
+       }
+
+       la->la_valid = LA_FLAGS;
+       rc = dt_declare_attr_set(env, obj, la, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, obj, 0);
+       if (unlikely(lfsck_is_dead_obj(obj)))
+               GOTO(unlock, rc = 1);
+
+       if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+               GOTO(unlock, rc = 0);
+
+       if (del_lmv) {
+               rc = dt_xattr_del(env, obj, XATTR_NAME_LMV, th, BYPASS_CAPA);
+               if (rc != 0)
+                       GOTO(unlock, rc);
+       }
+
+       rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
+       if (rc == 0 && !(la->la_flags & LUSTRE_IMMUTABLE_FL)) {
+               la->la_valid = LA_FLAGS;
+               la->la_flags |= LUSTRE_IMMUTABLE_FL;
+               rc = dt_attr_set(env, obj, la, th, BYPASS_CAPA);
+       }
+
+       GOTO(unlock, rc);
 
-               OBD_FREE_PTR(llmv);
+unlock:
+       dt_write_unlock(env, obj);
+
+stop:
+       dt_trans_stop(env, dev, th);
+
+log:
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK set the master MDT-object of "
+              "the striped directory "DFID" as read-only: rc = %d\n",
+              lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
+
+       if (rc <= 0) {
+               struct lfsck_namespace *ns = com->lc_file_ram;
+
+               ns->ln_flags |= LF_INCONSISTENT;
+               if (rc == 0)
+                       ns->ln_striped_dirs_disabled++;
        }
+
+       return rc;
 }
 
 static inline bool lfsck_is_valid_slave_lmv(struct lmv_mds_md_v1 *lmv)
@@ -411,8 +521,281 @@ log:
 }
 
 /**
+ * Check whether there are non-shard objects under the striped directory.
+ *
+ * If the master MDT-object of the striped directory lost its master LMV EA,
+ * then before the LFSCK repaired the striped directory, some ones may have
+ * created some non-shard objects under the master MDT-object. If such case
+ * happend, then the LFSCK cannot re-generate the lost master LMV EA to keep
+ * those non-shard objects to be visible to client.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] obj      pointer to the master MDT-object to be checked
+ * \param[in] cfid     the shard's FID used for verification
+ * \param[in] cidx     the shard's index used for verification
+ *
+ * \retval             positive number if not allow to re-generate LMV EA
+ * \retval             zero if allow to re-generate LMV EA
+ * \retval             negative error number on failure
+ */
+static int lfsck_allow_set_master_lmv(const struct lu_env *env,
+                                     struct lfsck_component *com,
+                                     struct dt_object *obj,
+                                     const struct lu_fid *cfid, __u32 cidx)
+{
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       struct lu_fid                   *tfid   = &info->lti_fid3;
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct lu_dirent                *ent    =
+                       (struct lu_dirent *)info->lti_key;
+       const struct dt_it_ops          *iops;
+       struct dt_it                    *di;
+       __u64                            cookie;
+       __u32                            args;
+       int                              rc;
+       __u16                            type;
+       ENTRY;
+
+       if (unlikely(!dt_try_as_dir(env, obj)))
+               RETURN(-ENOTDIR);
+
+       /* Check whether the shard and the master MDT-object matches or not. */
+       snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf), DFID":%u",
+                PFID(cfid), cidx);
+       rc = dt_lookup(env, obj, (struct dt_rec *)tfid,
+                      (const struct dt_key *)info->lti_tmpbuf, BYPASS_CAPA);
+       if (rc != 0)
+               RETURN(rc);
+
+       if (!lu_fid_eq(tfid, cfid))
+               RETURN(-ENOENT);
+
+       args = lfsck->li_args_dir & ~(LUDA_VERIFY | LUDA_VERIFY_DRYRUN);
+       iops = &obj->do_index_ops->dio_it;
+       di = iops->init(env, obj, args, BYPASS_CAPA);
+       if (IS_ERR(di))
+               RETURN(PTR_ERR(di));
+
+       rc = iops->load(env, di, 0);
+       if (rc == 0)
+               rc = iops->next(env, di);
+       else if (rc > 0)
+               rc = 0;
+
+       if (rc != 0)
+               GOTO(out, rc);
+
+       do {
+               rc = iops->rec(env, di, (struct dt_rec *)ent, args);
+               if (rc == 0)
+                       rc = lfsck_unpack_ent(ent, &cookie, &type);
+
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               /* skip dot and dotdot entries */
+               if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
+                       goto next;
+
+               /* If the subdir name does not match the shard name rule, then
+                * it is quite possible that it is NOT a shard, but created by
+                * someone after the master MDT-object lost the master LMV EA.
+                * But it is also possible that the subdir name entry crashed,
+                * under such double failure cases, the LFSCK cannot know how
+                * to repair the inconsistency. For data safe, the LFSCK will
+                * mark the master MDT-object as read-only. The administrator
+                * can fix the bad shard name manually, then run LFSCK again.
+                *
+                * XXX: If the subdir name matches the shard name rule, but it
+                *      is not a real shard of the striped directory, instead,
+                *      it was created by someone after the master MDT-object
+                *      lost the LMV EA, then re-generating the master LMV EA
+                *      will cause such subdir to be invisible to client, and
+                *      if its index occupies some lost shard index, then the
+                *      LFSCK will use it to replace the bad shard, and cause
+                *      the subdir (itself) to be invisible for ever. */
+               if (lfsck_shard_name_to_index(env, ent->lde_name,
+                               ent->lde_namelen, type, &ent->lde_fid) < 0)
+                       GOTO(out, rc = 1);
+
+next:
+               rc = iops->next(env, di);
+       } while (rc == 0);
+
+       GOTO(out, rc = 0);
+
+out:
+       iops->put(env, di);
+       iops->fini(env, di);
+
+       return rc;
+}
+
+/**
+ * Notify remote LFSCK instance that the object's LMV EA has been updated.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] obj      pointer to the object on which the LMV EA will be set
+ * \param[in] event    indicate either master or slave LMV EA has been updated
+ * \param[in] flags    indicate which element(s) in the LMV EA has been updated
+ * \param[in] index    the MDT index on which the LFSCK instance to be notified
+ *
+ * \retval             positive number if nothing to be done
+ * \retval             zero for succeed
+ * \retval             negative error number on failure
+ */
+static int lfsck_namespace_notify_lmv_remote(const struct lu_env *env,
+                                            struct lfsck_component *com,
+                                            struct dt_object *obj,
+                                            __u32 event, __u32 flags,
+                                            __u32 index)
+{
+       struct lfsck_request            *lr     = &lfsck_env_info(env)->lti_lr;
+       const struct lu_fid             *fid    = lfsck_dto2fid(obj);
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct lfsck_tgt_desc           *ltd    = NULL;
+       struct ptlrpc_request           *req    = NULL;
+       int                              rc;
+       ENTRY;
+
+       ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, index);
+       if (ltd == NULL)
+               GOTO(out, rc = -ENODEV);
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(ltd->ltd_exp),
+                                  &RQF_LFSCK_NOTIFY);
+       if (req == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
+       if (rc != 0) {
+               ptlrpc_request_free(req);
+
+               GOTO(out, rc);
+       }
+
+       lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
+       memset(lr, 0, sizeof(*lr));
+       lr->lr_event = event;
+       lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
+       lr->lr_active = LFSCK_TYPE_NAMESPACE;
+       lr->lr_fid = *fid;
+       lr->lr_flags = flags;
+
+       ptlrpc_request_set_replen(req);
+       rc = ptlrpc_queue_wait(req);
+       ptlrpc_req_finished(req);
+
+       GOTO(out, rc = (rc == -ENOENT ? 1 : rc));
+
+out:
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK notify LMV EA updated for the "
+              "object "DFID" on MDT %x remotely with event %u, flags %u: "
+              "rc = %d\n", lfsck_lfsck2name(lfsck), PFID(fid), index,
+              event, flags, rc);
+
+       if (ltd != NULL)
+               lfsck_tgt_put(ltd);
+
+       return rc;
+}
+
+/**
+ * Generate request for local LFSCK instance to rescan the striped directory.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] obj      pointer to the striped directory to be rescanned
+ *
+ * \retval             positive number if nothing to be done
+ * \retval             zero for succeed
+ * \retval             negative error number on failure
+ */
+int lfsck_namespace_notify_lmv_master_local(const struct lu_env *env,
+                                           struct lfsck_component *com,
+                                           struct dt_object *obj)
+{
+       struct lfsck_instance      *lfsck = com->lc_lfsck;
+       struct lfsck_namespace     *ns    = com->lc_file_ram;
+       struct lmv_mds_md_v1       *lmv4  = &lfsck_env_info(env)->lti_lmv4;
+       struct lfsck_lmv_unit      *llu;
+       struct lfsck_lmv           *llmv;
+       struct lfsck_slave_lmv_rec *lslr;
+       int                         count = 0;
+       int                         rc;
+       ENTRY;
+
+       if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+               RETURN(0);
+
+       rc = lfsck_read_stripe_lmv(env, obj, lmv4);
+       if (rc != 0)
+               RETURN(rc);
+
+       OBD_ALLOC_PTR(llu);
+       if (unlikely(llu == NULL))
+               RETURN(-ENOMEM);
+
+       if (lmv4->lmv_stripe_count < 1)
+               count = LFSCK_LMV_DEF_STRIPES;
+       else if (lmv4->lmv_stripe_count > LFSCK_LMV_MAX_STRIPES)
+               count = LFSCK_LMV_MAX_STRIPES;
+       else
+               count = lmv4->lmv_stripe_count;
+
+       OBD_ALLOC_LARGE(lslr, sizeof(struct lfsck_slave_lmv_rec) * count);
+       if (lslr == NULL) {
+               OBD_FREE_PTR(llu);
+
+               RETURN(-ENOMEM);
+       }
+
+       INIT_LIST_HEAD(&llu->llu_link);
+       llu->llu_lfsck = lfsck;
+       llu->llu_obj = lfsck_object_get(obj);
+       llmv = &llu->llu_lmv;
+       llmv->ll_lmv_master = 1;
+       llmv->ll_inline = 1;
+       atomic_set(&llmv->ll_ref, 1);
+       llmv->ll_stripes_allocated = count;
+       llmv->ll_hash_type = LMV_HASH_TYPE_UNKNOWN;
+       llmv->ll_lslr = lslr;
+       llmv->ll_lmv = *lmv4;
+
+       down_write(&com->lc_sem);
+       if (ns->ln_status != LS_SCANNING_PHASE1 &&
+           ns->ln_status != LS_SCANNING_PHASE2) {
+               ns->ln_striped_dirs_skipped++;
+               up_write(&com->lc_sem);
+               lfsck_lmv_put(env, llmv);
+       } else {
+               ns->ln_striped_dirs_repaired++;
+               spin_lock(&lfsck->li_lock);
+               list_add_tail(&llu->llu_link, &lfsck->li_list_lmv);
+               spin_unlock(&lfsck->li_lock);
+               up_write(&com->lc_sem);
+       }
+
+       RETURN(0);
+}
+
+/**
  * Set master LMV EA for the specified striped directory.
  *
+ * First, if the master MDT-object of a striped directory lost its LMV EA,
+ * then there may be some users have created some files under the master
+ * MDT-object directly. Under such case, the LFSCK cannot re-generate LMV
+ * EA for the master MDT-object, because we should keep the existing files
+ * to be visible to client. Then the LFSCK will mark the striped directory
+ * as read-only and keep it there to be handled by administrator manually.
+ *
+ * If nobody has created files under the master MDT-object of the striped
+ * directory, then we will set the master LMV EA and generate a new rescan
+ * (the striped directory) request that will be handled later by the LFSCK
+ * instance on the MDT later.
+ *
  * \param[in] env      pointer to the thread context
  * \param[in] com      pointer to the lfsck component
  * \param[in] dir      pointer to the object on which the LMV EA will be set
@@ -463,8 +846,6 @@ static int lfsck_namespace_set_lmv_master(const struct lu_env *env,
                pidx = lfsck_dev_idx(lfsck->li_bottom);
        }
 
-       /* XXX: it will be improved with subsequent patches landed. */
-
        rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
                              MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
                              LCK_EX);
@@ -472,14 +853,48 @@ static int lfsck_namespace_set_lmv_master(const struct lu_env *env,
                GOTO(log, rc);
 
        rc = lfsck_read_stripe_lmv(env, obj, lmv3);
-       if (rc != 0)
+       if (rc == -ENODATA) {
+               if (!(flags & LEF_SET_LMV_ALL))
+                       GOTO(log, rc);
+
+               *lmv3 = *lmv;
+       } else if (rc == 0) {
+               if (flags & LEF_SET_LMV_ALL)
+                       GOTO(log, rc = 1);
+
+               if (flags & LEF_SET_LMV_HASH)
+                       lmv3->lmv_hash_type = lmv->lmv_hash_type;
+       } else {
                GOTO(log, rc);
+       }
 
-       lmv3->lmv_hash_type = lmv->lmv_hash_type;
        lmv3->lmv_magic = LMV_MAGIC;
        lmv3->lmv_master_mdt_index = pidx;
 
+       if (flags & LEF_SET_LMV_ALL) {
+               rc = lfsck_allow_set_master_lmv(env, com, obj, cfid, cidx);
+               if (rc > 0) {
+                       rc = lfsck_disable_master_lmv(env, com, obj, false);
+
+                       GOTO(log, rc = (rc == 0 ? 1 : rc));
+               }
+
+               if (rc < 0)
+                       GOTO(log, rc);
+
+               /* To indicate that the master has ever lost LMV EA. */
+               lmv3->lmv_hash_type |= LMV_HASH_FLAG_LOST_LMV;
+       }
+
        rc = lfsck_namespace_update_lmv(env, com, obj, lmv3, true);
+       if (rc == 0 && flags & LEF_SET_LMV_ALL) {
+               if (dt_object_remote(obj))
+                       rc = lfsck_namespace_notify_lmv_remote(env, com, obj,
+                                               LE_SET_LMV_MASTER, 0, pidx);
+               else
+                       rc = lfsck_namespace_notify_lmv_master_local(env, com,
+                                                                    obj);
+       }
 
        GOTO(log, rc);
 
@@ -567,6 +982,147 @@ log:
 }
 
 /**
+ * Scan the shard of a striped directory for name hash verification.
+ *
+ * During the first-stage scanning, if the LFSCK cannot make sure whether
+ * the shard of a stripe directory contains valid slave LMV EA or not, then
+ * it will skip the name hash verification for this shard temporarily, and
+ * record the shard's FID in the LFSCK tracing file. As the LFSCK processing,
+ * the slave LMV EA may has been verified/fixed by LFSCK instance on master.
+ * Then in the second-stage scanning, the shard will be re-scanned, and for
+ * every name entry under the shard, the name hash will be verified, and for
+ * unmatched name entry, the LFSCK will try to fix it.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] child    pointer to the directory object to be handled
+ *
+ * \retval             positive number for scanning successfully
+ * \retval             zero for the scanning is paused
+ * \retval             negative error number on failure
+ */
+int lfsck_namespace_scan_shard(const struct lu_env *env,
+                              struct lfsck_component *com,
+                              struct dt_object *child)
+{
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       struct lmv_mds_md_v1            *lmv    = &info->lti_lmv;
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct lfsck_namespace          *ns     = com->lc_file_ram;
+       struct ptlrpc_thread            *thread = &lfsck->li_thread;
+       struct lu_dirent                *ent    =
+                       (struct lu_dirent *)info->lti_key;
+       struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
+       struct lfsck_lmv                *llmv   = NULL;
+       const struct dt_it_ops          *iops;
+       struct dt_it                    *di;
+       __u64                            cookie;
+       __u32                            args;
+       int                              rc;
+       __u16                            type;
+       ENTRY;
+
+       rc = lfsck_read_stripe_lmv(env, child, lmv);
+       if (rc != 0)
+               RETURN(rc == -ENODATA ? 1 : rc);
+
+       if (lmv->lmv_magic != LMV_MAGIC_STRIPE)
+               RETURN(1);
+
+       if (unlikely(!dt_try_as_dir(env, child)))
+               RETURN(-ENOTDIR);
+
+       OBD_ALLOC_PTR(llmv);
+       if (llmv == NULL)
+               RETURN(-ENOMEM);
+
+       llmv->ll_lmv_slave = 1;
+       llmv->ll_lmv_verified = 1;
+       llmv->ll_lmv = *lmv;
+       atomic_set(&llmv->ll_ref, 1);
+
+       args = lfsck->li_args_dir & ~(LUDA_VERIFY | LUDA_VERIFY_DRYRUN);
+       iops = &child->do_index_ops->dio_it;
+       di = iops->init(env, child, args, BYPASS_CAPA);
+       if (IS_ERR(di))
+               GOTO(out, rc = PTR_ERR(di));
+
+       rc = iops->load(env, di, 0);
+       if (rc == 0)
+               rc = iops->next(env, di);
+       else if (rc > 0)
+               rc = 0;
+
+       while (rc == 0) {
+               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
+                   cfs_fail_val > 0) {
+                       struct l_wait_info lwi;
+
+                       lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
+                                         NULL, NULL);
+                       l_wait_event(thread->t_ctl_waitq,
+                                    !thread_is_running(thread),
+                                    &lwi);
+
+                       if (unlikely(!thread_is_running(thread)))
+                               GOTO(out, rc = 0);
+               }
+
+               rc = iops->rec(env, di, (struct dt_rec *)ent, args);
+               if (rc == 0)
+                       rc = lfsck_unpack_ent(ent, &cookie, &type);
+
+               if (rc != 0) {
+                       if (bk->lb_param & LPF_FAILOUT)
+                               GOTO(out, rc);
+
+                       goto next;
+               }
+
+               /* skip dot and dotdot entries */
+               if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
+                       goto next;
+
+               if (!lfsck_is_valid_slave_name_entry(env, llmv, ent->lde_name,
+                                                    ent->lde_namelen)) {
+                       ns->ln_flags |= LF_INCONSISTENT;
+                       rc = lfsck_namespace_repair_bad_name_hash(env, com,
+                                               child, llmv, ent->lde_name);
+                       if (rc >= 0)
+                               ns->ln_name_hash_repaired++;
+               }
+
+               if (rc < 0 && bk->lb_param & LPF_FAILOUT)
+                       GOTO(out, rc);
+
+               /* Rate control. */
+               lfsck_control_speed(lfsck);
+               if (unlikely(!thread_is_running(thread)))
+                       GOTO(out, rc = 0);
+
+               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
+                       spin_lock(&lfsck->li_lock);
+                       thread_set_flags(thread, SVC_STOPPING);
+                       spin_unlock(&lfsck->li_lock);
+
+                       GOTO(out, rc = -EINVAL);
+               }
+
+next:
+               rc = iops->next(env, di);
+       }
+
+       GOTO(out, rc);
+
+out:
+       iops->put(env, di);
+       iops->fini(env, di);
+       lfsck_lmv_put(env, llmv);
+
+       return rc;
+}
+
+/**
  * Verify the slave object's (of striped directory) LMV EA.
  *
  * For the slave object of a striped directory, before traversing the shard
@@ -630,9 +1186,12 @@ int lfsck_namespace_verify_stripe_slave(const struct lu_env *env,
                /* If the parent has no LMV EA, then it maybe because:
                 * 1) The parent lost the LMV EA.
                 * 2) The child claims a wrong (slave) LMV EA. */
-
-               /* XXX: to be improved. */
-               rc = 0;
+               if (rc == -ENODATA)
+                       rc = lfsck_namespace_set_lmv_master(env, com, parent,
+                                       clmv, cfid, clmv->lmv_master_mdt_index,
+                                       LEF_SET_LMV_ALL);
+               else
+                       rc = 0;
 
                rc1 = lfsck_namespace_trace_update(env, com, cfid,
                                                   LNTF_UNCERTAIN_LMV, true);