Whamcloud - gitweb
LU-5519 lfsck: repair slave LMV for striped directory
[fs/lustre-release.git] / lustre / lfsck / lfsck_striped_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_striped_dir.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 /*
32  * About the verification for striped directory. Some rules and assumptions:
33  *
34  * 1) lmv_magic: The magic may be wrong. But it is almost impossible (1/2^32
35  *    probability) that a master LMV EA claims as a slave LMV EA by wrong,
36  *    so we can ignore such race case and the reverse case.
37  *
38  * 2) lmv_master_mdt_index: The master index can be self-verified by compared
39  *    with the MDT index directly. The slave stripe index can be verified by
40  *    compared with the file name. Although both the name entry and the LMV EA
41  *    can be wrong, it is almost impossible that they hit the same bad data
42  *    So if they match each other, then trust them. Similarly, for the shard,
43  *    it stores index in both slave LMV EA and in linkEA, if the two copies
44  *    match, then trust them.
45  *
46  * 3) lmv_hash_type: The valid hash type should be LMV_HASH_TYPE_ALL_CHARS or
47  *    LMV_HASH_TYPE_FNV_1A_64. If the LFSCK instance on some slave finds that
48  *    the name hash against the hash function does not match the MDT, then it
49  *    will change the master LMV EA hash type as LMV_HASH_TYPE_UNKNOWN. With
50  *    such hash type, the whole striped directory still can be accessed via
51  *    lookup/readdir, and also support unlink, but cannot add new name entry.
52  *
53  * 3.1) If the master hash type is one of the valid values, then trust the
54  *      master LMV EA. Because:
55  *
56  * 3.1.1) The master hash type is visible to the client and used by the client.
57  *
58  * 3.1.2) For a given name, different hash types may map the name entry to the
59  *        same MDT. So simply checking one name entry or some name entries may
60  *        cannot verify whether the hash type is correct or not.
61  *
62  * 3.1.3) Different shards can claim different hash types, it is not easy to
63  *        distinguish which ones are correct. Even though the master is wrong,
64  *        as the LFSCK processing, some LFSCK instance on other MDT may finds
65  *        unmatched name hash, then it will change the master hash type to
66  *        LMV_HASH_TYPE_UNKNOWN as described above. The worst case is euqal
67  *        to the case without the LFSCK.
68  *
69  * 3.2) If the master hash type is invalid, nor LMV_HASH_TYPE_UNKNOWN, then
70  *      trust the first shard with valid hash type (ALL_CHARS or FNV_1A_64).
71  *      If the shard is also worng, means there are double failures, then as
72  *      the LFSCK processing, other LFSCK instances on the other MDTs may
73  *      find unmatched name hash, and then, the master hash type will be
74  *      changed to LMV_HASH_TYPE_UNKNOWN as described in the 3).
75  *
76  * 3.3) If the master hash type is LMV_HASH_TYPE_UNKNOWN, then it is possible
77  *      that some other LFSCK instance on other MDT found bad name hash, then
78  *      changed the master hash type to LMV_HASH_TYPE_UNKNOWN as described in
79  *      the 3). But it also maybe because of data corruption in master LMV EA.
80  *      To make such two cases to be distinguishable, when the LFSCK changes
81  *      the master hash type to LMV_HASH_TYPE_UNKNOWN, it will mark in the
82  *      master LMV EA (new lmv flags LMV_HASH_FLAG_BAD_TYPE). Then subsequent
83  *      LFSCK checking can distinguish them: for former case, turst the master
84  *      LMV EA with nothing to be done; otherwise, trust the first shard with
85  *      valid hash type (ALL_CHARS or FNV_1A_64) as the 3.2) does.
86  *
87  * 4) lmv_stripe_count: For a shard of a striped directory, if its index has
88  *    been verified as the 2), then the stripe count must be larger than its
89  *    index. For the master object, by scanning each shard's index, the LFSCK
90  *    can know the highest index, and the stripe count must be larger than the
91  *    known highest index. If the stipe count in the LMV EA matches above two
92  *    rules, then it is may be trustable. If both the master claimed stripe
93  *    count and the slave claimed stripe count match each own rule, but they
94  *    are not the same, then trust the master. Because the stripe count in
95  *    the master LMV EA is visible to client and used to distribute the name
96  *    entry to some shard, but the slave LMV EA is only used for verification
97  *    and invisible to client.
98  *
99  * 5) If the master LMV EA is lost, then there are two possible cases:
100  *
101  * 5.1) The slave claims slave LMV EA by wrong, means that the parent was not
102  *      a striped directory, but its sub-directory has a wrong slave LMV EA.
103  *      It is very very race case, similar as the 1), can be ignored.
104  *
105  * 5.2) The parent directory is a striped directory, but the master LMV EA
106  *      is lost or crashed. Then the LFSCK needs to re-generate the master
107  *      LMV EA: the lmv_master_mdt_index is from the MDT device index; the
108  *      lmv_hash_type is from the first valid shard; the lmv_stripe_count
109  *      will be calculated via scanning all the shards.
110  *
111  * 5.2.1) Before re-generating the master LMV EA, the LFSCK needs to check
112  *        whether someone has created some file(s) under the master object
113  *        after the master LMV EA disappear. If yes, the LFSCK will cannot
114  *        re-generate the master LMV EA, otherwise, such new created files
115  *        will be invisible to client. Under such case, the LFSCK will mark
116  *        the master object as read only (without master LMV EA). Then all
117  *        things under the master MDT-object, including those new created
118  *        files and the shards themselves, will be visibile to client. And
119  *        then the administrator can handle the bad striped directory with
120  *        more human knowledge.
121  *
122  * 5.2.2) If someone created some special sub-directory under the master
123  *        MDT-object with the same naming rule as shard name $FID:$index,
124  *        as to the LFSCK cannot detect it before re-generating the master
125  *        LMV EA, then such sub-directory itself will be invisible after
126  *        the LFSCK re-generating the master LMV EA. The sub-items under
127  *        such sub-directory are still visible to client. As the LFSCK
128  *        processing, if such sub-directory cause some conflict with other
129  *        normal shard, such as the index conflict, then the LFSCK will
130  *        remove the master LMV EA and change the master MDT-object to
131  *        read-only mode as the 5.2.1). But if there is no conflict, the
132  *        LFSCK will regard such sub-directory as a striped shard that
133  *        lost its slave LMV EA, and will re-generate slave LMV EA for it.
134  *
135  * 5.2.3) Anytime, if the LFSCK found some shards name/index conflict,
136  *        and cannot make the distinguish which one is right, then it
137  *        will remove the master LMV EA and change the MDT-object to
138  *        read-only mode as the 5.2.2).
139  */
140
141 #define DEBUG_SUBSYSTEM S_LFSCK
142
143 #include <lustre/lustre_idl.h>
144 #include <lu_object.h>
145 #include <dt_object.h>
146 #include <md_object.h>
147 #include <lustre_fid.h>
148 #include <lustre_lib.h>
149 #include <lustre_net.h>
150 #include <lustre_lmv.h>
151 #include <lustre/lustre_user.h>
152
153 #include "lfsck_internal.h"
154
155 void lfsck_lmv_put(const struct lu_env *env, struct lfsck_lmv *llmv)
156 {
157         if (llmv != NULL && atomic_dec_and_test(&llmv->ll_ref)) {
158                 if (llmv->ll_inline) {
159                         struct lfsck_lmv_unit   *llu;
160                         struct lfsck_instance   *lfsck;
161
162                         llu = list_entry(llmv, struct lfsck_lmv_unit, llu_lmv);
163                         lfsck = llu->llu_lfsck;
164
165                         spin_lock(&lfsck->li_lock);
166                         list_del(&llu->llu_link);
167                         spin_unlock(&lfsck->li_lock);
168
169                         lfsck_object_put(env, llu->llu_obj);
170
171                         LASSERT(llmv->ll_lslr != NULL);
172
173                         OBD_FREE_LARGE(llmv->ll_lslr,
174                                        sizeof(*llmv->ll_lslr) *
175                                        llmv->ll_stripes_allocated);
176                         OBD_FREE_PTR(llu);
177                 } else {
178                         if (llmv->ll_lslr != NULL)
179                                 OBD_FREE_LARGE(llmv->ll_lslr,
180                                         sizeof(*llmv->ll_lslr) *
181                                         llmv->ll_stripes_allocated);
182
183                         OBD_FREE_PTR(llmv);
184                 }
185         }
186 }
187
188 /**
189  * Mark the specified directory as read-only by set LUSTRE_IMMUTABLE_FL.
190  *
191  * The caller has taken the ldlm lock on the @obj already.
192  *
193  * \param[in] env       pointer to the thread context
194  * \param[in] com       pointer to the lfsck component
195  * \param[in] obj       pointer to the object to be handled
196  * \param[in] del_lmv   true if need to drop the LMV EA
197  *
198  * \retval              positive number if nothing to be done
199  * \retval              zero for succeed
200  * \retval              negative error number on failure
201  */
202 static int lfsck_disable_master_lmv(const struct lu_env *env,
203                                     struct lfsck_component *com,
204                                     struct dt_object *obj, bool del_lmv)
205 {
206         struct lfsck_thread_info        *info   = lfsck_env_info(env);
207         struct lu_attr                  *la     = &info->lti_la;
208         struct lfsck_instance           *lfsck  = com->lc_lfsck;
209         struct dt_device                *dev    = lfsck_obj2dt_dev(obj);
210         struct thandle                  *th     = NULL;
211         int                              rc     = 0;
212         ENTRY;
213
214         th = dt_trans_create(env, dev);
215         if (IS_ERR(th))
216                 GOTO(log, rc = PTR_ERR(th));
217
218         if (del_lmv) {
219                 rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LMV, th);
220                 if (rc != 0)
221                         GOTO(stop, rc);
222         }
223
224         la->la_valid = LA_FLAGS;
225         rc = dt_declare_attr_set(env, obj, la, th);
226         if (rc != 0)
227                 GOTO(stop, rc);
228
229         rc = dt_trans_start_local(env, dev, th);
230         if (rc != 0)
231                 GOTO(stop, rc);
232
233         dt_write_lock(env, obj, 0);
234         if (unlikely(lfsck_is_dead_obj(obj)))
235                 GOTO(unlock, rc = 1);
236
237         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
238                 GOTO(unlock, rc = 0);
239
240         if (del_lmv) {
241                 rc = dt_xattr_del(env, obj, XATTR_NAME_LMV, th, BYPASS_CAPA);
242                 if (rc != 0)
243                         GOTO(unlock, rc);
244         }
245
246         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
247         if (rc == 0 && !(la->la_flags & LUSTRE_IMMUTABLE_FL)) {
248                 la->la_valid = LA_FLAGS;
249                 la->la_flags |= LUSTRE_IMMUTABLE_FL;
250                 rc = dt_attr_set(env, obj, la, th, BYPASS_CAPA);
251         }
252
253         GOTO(unlock, rc);
254
255 unlock:
256         dt_write_unlock(env, obj);
257
258 stop:
259         dt_trans_stop(env, dev, th);
260
261 log:
262         CDEBUG(D_LFSCK, "%s: namespace LFSCK set the master MDT-object of "
263                "the striped directory "DFID" as read-only: rc = %d\n",
264                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
265
266         if (rc <= 0) {
267                 struct lfsck_namespace *ns = com->lc_file_ram;
268
269                 ns->ln_flags |= LF_INCONSISTENT;
270                 if (rc == 0)
271                         ns->ln_striped_dirs_disabled++;
272         }
273
274         return rc;
275 }
276
277 static inline bool lfsck_is_valid_slave_lmv(struct lmv_mds_md_v1 *lmv)
278 {
279         return lmv->lmv_stripe_count >= 1 &&
280                lmv->lmv_stripe_count <= LFSCK_LMV_MAX_STRIPES &&
281                lmv->lmv_stripe_count > lmv->lmv_master_mdt_index &&
282                lmv_is_known_hash_type(lmv->lmv_hash_type);
283 }
284
285 /**
286  * Remove the striped directory's master LMV EA and mark it as read-only.
287  *
288  * Take ldlm lock on the striped directory before calling the
289  * lfsck_disable_master_lmv().
290  *
291  * \param[in] env       pointer to the thread context
292  * \param[in] com       pointer to the lfsck component
293  * \param[in] lnr       pointer to the namespace request that contains the
294  *                      striped directory to be handled and other information
295  *
296  * \retval              positive number if nothing to be done
297  * \retval              zero for succeed
298  * \retval              negative error number on failure
299  */
300 static int lfsck_remove_lmv(const struct lu_env *env,
301                             struct lfsck_component *com,
302                             struct lfsck_namespace_req *lnr)
303 {
304         struct dt_object        *obj    = lnr->lnr_obj;
305         struct lustre_handle     lh     = { 0 };
306         int                      rc;
307
308         lnr->lnr_lmv->ll_ignore = 1;
309         rc = lfsck_ibits_lock(env, com->lc_lfsck, obj, &lh,
310                               MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
311                               LCK_EX);
312         if (rc == 0) {
313                 rc = lfsck_disable_master_lmv(env, com, obj, true);
314                 lfsck_ibits_unlock(&lh, LCK_EX);
315         }
316
317         return rc;
318 }
319
320 /**
321  * Remove the name entry from the striped directory's master MDT-object.
322  *
323  * \param[in] env       pointer to the thread context
324  * \param[in] com       pointer to the lfsck component
325  * \param[in] dir       pointer to the striped directory
326  * \param[in] fid       the shard's FID which name entry will be removed
327  * \param[in] index     the shard's index which name entry will be removed
328  *
329  * \retval              positive number for repaired successfully
330  * \retval              0 if nothing to be repaired
331  * \retval              negative error number on failure
332  */
333 static int lfsck_remove_dirent(const struct lu_env *env,
334                                struct lfsck_component *com,
335                                struct dt_object *dir,
336                                const struct lu_fid *fid, __u32 index)
337 {
338         struct lfsck_thread_info        *info = lfsck_env_info(env);
339         struct dt_object                *obj;
340         int                              rc;
341
342         snprintf(info->lti_tmpbuf2, sizeof(info->lti_tmpbuf2), DFID":%u",
343                  PFID(fid), index);
344         obj = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, fid);
345         if (IS_ERR(obj))
346                 return PTR_ERR(obj);
347
348         rc = lfsck_namespace_repair_dirent(env, com, dir, obj,
349                                         info->lti_tmpbuf2, info->lti_tmpbuf2,
350                                         S_IFDIR, false, false);
351         lfsck_object_put(env, obj);
352         if (rc > 0) {
353                 struct lfsck_namespace *ns = com->lc_file_ram;
354
355                 ns->ln_dirent_repaired++;
356         }
357
358         return rc;
359 }
360
361 /**
362  * Remove old shard's name entry and refill the @lslr slot with new shard.
363  *
364  * Some old shard held the specified @lslr slot, but it is an invalid shard.
365  * This function will remove the bad shard's name entry, and refill the @lslr
366  * slot with the new shard.
367  *
368  * \param[in] env       pointer to the thread context
369  * \param[in] com       pointer to the lfsck component
370  * \param[in] lslr      pointer to lfsck_disable_master_lmv slot which content
371  *                      will be replaced by the given information
372  * \param[in] lnr       contain the shard's FID to be used to fill the
373  *                      @lslr slot, it also records the known max filled index
374  *                      and the known max stripe count
375  * \param[in] lmv       contain the slave LMV EA to be used to fill the
376  *                      @lslr slot
377  * \param[in] index     the old shard's index in the striped directory
378  * \param[in] flags     the new shard's flags in the @lslr slot
379  *
380  * \retval              zero for succeed
381  * \retval              negative error number on failure
382  */
383 static int lfsck_replace_lmv(const struct lu_env *env,
384                              struct lfsck_component *com,
385                              struct lfsck_slave_lmv_rec *lslr,
386                              struct lfsck_namespace_req *lnr,
387                              struct lmv_mds_md_v1 *lmv,
388                              __u32 index, __u32 flags)
389 {
390         int rc;
391
392         rc = lfsck_remove_dirent(env, com, lnr->lnr_obj,
393                                  &lslr->lslr_fid, index);
394         if (rc < 0)
395                 return rc;
396
397         lslr->lslr_fid = lnr->lnr_fid;
398         lslr->lslr_flags = flags;
399         if (lmv != NULL) {
400                 struct lfsck_lmv *llmv = lnr->lnr_lmv;
401
402                 lslr->lslr_stripe_count = lmv->lmv_stripe_count;
403                 lslr->lslr_index = lmv->lmv_master_mdt_index;
404                 lslr->lslr_hash_type = lmv->lmv_hash_type;
405
406                 if (flags == LSLF_NONE &&
407                     llmv->ll_hash_type == LMV_HASH_TYPE_UNKNOWN &&
408                     lmv_is_known_hash_type(lmv->lmv_hash_type))
409                         llmv->ll_hash_type = lmv->lmv_hash_type;
410
411                 if (flags == LSLF_NONE &&
412                     lslr->lslr_stripe_count <= LFSCK_LMV_MAX_STRIPES &&
413                     llmv->ll_max_stripe_count < lslr->lslr_stripe_count)
414                         llmv->ll_max_stripe_count = lslr->lslr_stripe_count;
415         } else {
416                 lslr->lslr_stripe_count = 0;
417                 lslr->lslr_index = 0;
418                 lslr->lslr_hash_type = 0;
419         }
420
421         return 0;
422 }
423
424 /**
425  * Record the slave LMV EA in the lfsck_lmv::ll_lslr.
426  *
427  * If the lfsck_lmv::ll_lslr slot corresponding to the given @shard_idx is free,
428  * then fill the slot with the given @lnr/@lmv/@flags directly (maybe need to
429  * extend the lfsck_lmv::ll_lslr buffer).
430  *
431  * If the lfsck_lmv::ll_lslr slot corresponding to the given @shard_idx is taken
432  * by other shard, then the LFSCK will try to resolve the conflict by checking
433  * the two conflict shards' flags, and try other possible slot (if one of them
434  * claims another possible @shard_idx).
435  *
436  * 1) If one of the two conflict shards can be recorded in another slot, then
437  *    it is OK, go ahead. Otherwise,
438  *
439  * 2) If one of them is dangling name entry, then remove (one of) the dangling
440  *    name entry (and replace related @lslr slot if needed). Otherwise,
441  *
442  * 3) If one of them has no slave LMV EA, then check whether the master LMV
443  *    EA has ever been lost and re-generated (LMV_HASH_FLAG_LOST_LMV in the
444  *    master LMV EA).
445  *
446  * 3.1) If yes, then it is possible that such object is not a real shard of
447  *      the striped directory, instead, it was created by someone after the
448  *      master LMV EA lost with the name that matches the shard naming rule.
449  *      Then the LFSCK will remove the master LMV EA and mark the striped
450  *      directory as read-only to allow those non-shard files to be visible
451  *      to client.
452  *
453  * 3.2) If no, then remove (one of) the object what has no slave LMV EA.
454  *
455  * 4) If all above efforts cannot work, then the LFSCK cannot know how to
456  *    recover the striped directory. To make the administrator can see the
457  *    conflicts, the LFSCK will remove the master LMV EA and mark the striped
458  *    directory as read-only.
459  *
460  * This function may be called recursively, to prevent overflow, we define
461  * LFSCK_REC_LMV_MAX_DEPTH to restrict the recursive call depth.
462  *
463  * \param[in] env       pointer to the thread context
464  * \param[in] com       pointer to the lfsck component
465  * \param[in] lnr       contain the shard's FID to fill the @lslr slot,
466  *                      it also records the known max filled index and
467  *                      the known max stripe count
468  * \param[in] lmv       pointer to the slave LMV EA to be recorded
469  * \param[in] shard_idx the shard's index used for locating the @lslr slot,
470  *                      it can be the index stored in the shard's name,
471  *                      it also can be the index stored in the slave LMV EA
472  *                      (for recursive case)
473  * \param[in] flags     the shard's flags to be recorded in the @lslr slot
474  *                      to indicate the shard status, such as whether has
475  *                      slave LMV EA, whether dangling name entry, whether
476  *                      the name entry and slave LMV EA unmatched, and ect
477  * \param[in] flags2    when be called recursively, the @flags2 tells the
478  *                      former conflict shard's flags in the @lslr slot.
479  * \param[in,out] depth To prevent to be called recurisively too deep,
480  *                      we define the max depth can be called recursively
481  *                      (LFSCK_REC_LMV_MAX_DEPTH)
482  *
483  * \retval              zero for succeed
484  * \retval              "-ERANGE" for invalid @shard_idx
485  * \retval              "-EEXIST" for the required lslr slot has been
486  *                      occupied by other shard
487  * \retval              other negative error number on failure
488  */
489 static int lfsck_record_lmv(const struct lu_env *env,
490                             struct lfsck_component *com,
491                             struct lfsck_namespace_req *lnr,
492                             struct lmv_mds_md_v1 *lmv, __u32 shard_idx,
493                             __u32 flags, __u32 flags2, __u32 *depth)
494 {
495         struct lfsck_instance      *lfsck = com->lc_lfsck;
496         struct lfsck_lmv           *llmv  = lnr->lnr_lmv;
497         struct dt_object           *dir   = lnr->lnr_obj;
498         const struct lu_fid        *fid   = &lnr->lnr_fid;
499         struct lfsck_slave_lmv_rec *lslr;
500         struct lfsck_rec_lmv_save  *lrls;
501         int                         index = shard_idx;
502         int                         rc    = 0;
503         ENTRY;
504
505         CDEBUG(D_LFSCK, "%s: record slave LMV EA for the striped directory "
506                DFID": shard = "DFID", index = %u, flags = %u, flags2 = %u, "
507                "depth = %d\n", lfsck_lfsck2name(lfsck),
508                PFID(lfsck_dto2fid(dir)), PFID(fid),
509                index, flags, flags2, *depth);
510
511         if (index < 0 || index >= LFSCK_LMV_MAX_STRIPES)
512                 RETURN(-ERANGE);
513
514         if (index >= llmv->ll_stripes_allocated) {
515                 struct lfsck_slave_lmv_rec *new_lslr;
516                 int new_stripes = index + 1;
517                 size_t old_size = sizeof(*lslr) * llmv->ll_stripes_allocated;
518
519                 OBD_ALLOC_LARGE(new_lslr, sizeof(*new_lslr) * new_stripes);
520                 if (new_lslr == NULL) {
521                         llmv->ll_failed = 1;
522
523                         RETURN(-ENOMEM);
524                 }
525
526                 memcpy(new_lslr, llmv->ll_lslr, old_size);
527                 OBD_FREE_LARGE(llmv->ll_lslr, old_size);
528                 llmv->ll_stripes_allocated = new_stripes;
529                 llmv->ll_lslr = new_lslr;
530         }
531
532         lslr = llmv->ll_lslr + index;
533         if (unlikely(lu_fid_eq(&lslr->lslr_fid, fid)))
534                 RETURN(0);
535
536         if (fid_is_zero(&lslr->lslr_fid)) {
537                 lslr->lslr_fid = *fid;
538                 if (lmv != NULL) {
539                         lslr->lslr_stripe_count = lmv->lmv_stripe_count;
540                         lslr->lslr_index = lmv->lmv_master_mdt_index;
541                         lslr->lslr_hash_type = lmv->lmv_hash_type;
542
543                         if (flags == LSLF_NONE &&
544                             llmv->ll_hash_type == LMV_HASH_TYPE_UNKNOWN &&
545                             lmv_is_known_hash_type(lmv->lmv_hash_type))
546                                 llmv->ll_hash_type = lmv->lmv_hash_type;
547
548                         if (flags == LSLF_NONE &&
549                             lslr->lslr_stripe_count <= LFSCK_LMV_MAX_STRIPES &&
550                             llmv->ll_max_stripe_count < lslr->lslr_stripe_count)
551                                 llmv->ll_max_stripe_count =
552                                                         lslr->lslr_stripe_count;
553                 }
554
555                 lslr->lslr_flags = flags;
556                 llmv->ll_stripes_filled++;
557
558                 if (llmv->ll_max_filled_off < index)
559                         llmv->ll_max_filled_off = index;
560
561                 RETURN(0);
562         }
563
564         (*depth)++;
565         if (flags != LSLF_BAD_INDEX2)
566                 LASSERTF(*depth == 1, "depth = %d\n", *depth);
567
568         /* Handle conflict cases. */
569         switch (lslr->lslr_flags) {
570         case LSLF_NONE:
571         case LSLF_BAD_INDEX2:
572                 /* The existing one is a normal valid object. */
573                 switch (flags) {
574                 case LSLF_NONE:
575                         /* The two 'valid' name entries claims the same
576                          * index, the LFSCK cannot distinguish which one
577                          * is correct. Then remove the master LMV EA to
578                          * make all shards to be visible to client, and
579                          * mark the master MDT-object as read-only. The
580                          * administrator can handle the conflict with
581                          * more human knowledge. */
582                         rc = lfsck_remove_lmv(env, com, lnr);
583                         break;
584                 case LSLF_BAD_INDEX2:
585                         GOTO(out, rc = -EEXIST);
586                 case LSLF_NO_LMVEA:
587
588 no_lmvea:
589                         if (llmv->ll_lmv.lmv_hash_type &
590                             LMV_HASH_FLAG_LOST_LMV) {
591                                 /* If the master LMV EA was re-generated
592                                  * by the former LFSCK reparation, and
593                                  * before such reparation, someone has
594                                  * created the conflict object, but the
595                                  * LFSCK did not detect such conflict,
596                                  * then we have to remove the master
597                                  * LMV EA and mark the master MDT-object
598                                  * as read-only. The administrator can
599                                  * handle the conflict with more human
600                                  * knowledge. */
601                                 rc = lfsck_remove_lmv(env, com, lnr);
602                         } else {
603                                 /* Otherwise, remove the current name entry,
604                                  * and add its FID in the LFSCK tracing file
605                                  * for further processing. */
606                                 rc = lfsck_namespace_trace_update(env, com, fid,
607                                                 LNTF_CHECK_PARENT, true);
608                                 if (rc == 0)
609                                         rc = lfsck_remove_dirent(env, com, dir,
610                                                                  fid, index);
611                         }
612
613                         break;
614                 case LSLF_DANGLING:
615                         /* Remove the current dangling name entry. */
616                         rc = lfsck_remove_dirent(env, com, dir, fid, index);
617                         break;
618                 case LSLF_BAD_INDEX1:
619                         index = lmv->lmv_master_mdt_index;
620                         lmv->lmv_master_mdt_index = shard_idx;
621                         /* The name entry claims an index that is conflict
622                          * with a valid existing name entry, then try the
623                          * index in the lmv recursively. */
624                         rc = lfsck_record_lmv(env, com, lnr, lmv, index,
625                                 LSLF_BAD_INDEX2, lslr->lslr_flags, depth);
626                         lmv->lmv_master_mdt_index = index;
627                         if (rc == -ERANGE || rc == -EEXIST)
628                                 /* The index in the lmv is invalid or
629                                  * also conflict with other. Then we do
630                                  * not know how to resolve the conflict.
631                                  * We will handle it as handle the case
632                                  * of 'LSLF_NONE' vs 'LSLF_NONE'. */
633                                 rc = lfsck_remove_lmv(env, com, lnr);
634
635                         break;
636                 default:
637                         break;
638                 }
639
640                 break;
641         case LSLF_NO_LMVEA:
642                 /* The existing one has no slave LMV EA. */
643                 switch (flags) {
644                 case LSLF_NONE:
645
646 none:
647                         if (llmv->ll_lmv.lmv_hash_type &
648                             LMV_HASH_FLAG_LOST_LMV) {
649                                 /* If the master LMV EA was re-generated
650                                  * by the former LFSCK reparation, and
651                                  * before such reparation, someone has
652                                  * created the conflict object, but the
653                                  * LFSCK did not detect such conflict,
654                                  * then we have to remove the master
655                                  * LMV EA and mark the master MDT-object
656                                  * as read-only. The administrator can
657                                  * handle the conflict with more human
658                                  * knowledge. */
659                                 rc = lfsck_remove_lmv(env, com, lnr);
660                         } else {
661                                 lrls = &lfsck->li_rec_lmv_save[*depth - 1];
662                                 lrls->lrls_fid = lslr->lslr_fid;
663                                 /* Otherwise, remove the existing name entry,
664                                  * and add its FID in the LFSCK tracing file
665                                  * for further processing. Refill the slot
666                                  * with current slave LMV EA. */
667                                 rc = lfsck_namespace_trace_update(env,
668                                                 com, &lrls->lrls_fid,
669                                                 LNTF_CHECK_PARENT, true);
670                                 if (rc == 0)
671                                         rc = lfsck_replace_lmv(env, com, lslr,
672                                                         lnr, lmv, index, flags);
673                         }
674
675                         break;
676                 case LSLF_BAD_INDEX2:
677                         if (flags2 >= lslr->lslr_flags)
678                                 GOTO(out, rc = -EEXIST);
679
680                         goto none;
681                 case LSLF_NO_LMVEA:
682                         goto no_lmvea;
683                 case LSLF_DANGLING:
684                         /* Remove the current dangling name entry. */
685                         rc = lfsck_remove_dirent(env, com, dir, fid, index);
686                         break;
687                 case LSLF_BAD_INDEX1:
688                         index = lmv->lmv_master_mdt_index;
689                         lmv->lmv_master_mdt_index = shard_idx;
690                         /* The name entry claims an index that is conflict
691                          * with a valid existing name entry, then try the
692                          * index in the lmv recursively. */
693                         rc = lfsck_record_lmv(env, com, lnr, lmv, index,
694                                 LSLF_BAD_INDEX2, lslr->lslr_flags, depth);
695                         lmv->lmv_master_mdt_index = index;
696                         if (rc == -ERANGE || rc == -EEXIST) {
697                                 index = shard_idx;
698                                 goto no_lmvea;
699                         }
700
701                         break;
702                 default:
703                         break;
704                 }
705
706                 break;
707         case LSLF_DANGLING:
708                 /* The existing one is a dangling name entry. */
709                 switch (flags) {
710                 case LSLF_NONE:
711                 case LSLF_BAD_INDEX2:
712                 case LSLF_NO_LMVEA:
713                         /* Remove the existing dangling name entry.
714                          * Refill the lslr slot with the given LMV. */
715                         rc = lfsck_replace_lmv(env, com, lslr, lnr,
716                                                lmv, index, flags);
717                         break;
718                 case LSLF_DANGLING:
719                         /* Two dangling name entries conflict,
720                          * remove the current one. */
721                         rc = lfsck_remove_dirent(env, com, dir, fid, index);
722                         break;
723                 case LSLF_BAD_INDEX1:
724                         index = lmv->lmv_master_mdt_index;
725                         lmv->lmv_master_mdt_index = shard_idx;
726                         /* The name entry claims an index that is conflict
727                          * with a valid existing name entry, then try the
728                          * index in the lmv recursively. */
729                         rc = lfsck_record_lmv(env, com, lnr, lmv, index,
730                                 LSLF_BAD_INDEX2, lslr->lslr_flags, depth);
731                         lmv->lmv_master_mdt_index = index;
732                         if (rc == -ERANGE || rc == -EEXIST)
733                                 /* If the index in the lmv is invalid or
734                                  * also conflict with other, then remove
735                                  * the existing dangling name entry.
736                                  * Refill the lslr slot with the given LMV. */
737                                 rc = lfsck_replace_lmv(env, com, lslr, lnr,
738                                                        lmv, shard_idx, flags);
739
740                         break;
741                 default:
742                         break;
743                 }
744
745                 break;
746         case LSLF_BAD_INDEX1: {
747                 if (*depth >= LFSCK_REC_LMV_MAX_DEPTH)
748                         goto conflict;
749
750                 lrls = &lfsck->li_rec_lmv_save[*depth - 1];
751                 lrls->lrls_fid = lnr->lnr_fid;
752                 lrls->lrls_lmv = *lmv;
753
754                 lnr->lnr_fid = lslr->lslr_fid;
755                 lmv->lmv_master_mdt_index = index;
756                 lmv->lmv_stripe_count = lslr->lslr_stripe_count;
757                 lmv->lmv_hash_type = lslr->lslr_hash_type;
758                 index = lslr->lslr_index;
759
760                 /* The existing one has another possible slot,
761                  * try it recursively. */
762                 rc = lfsck_record_lmv(env, com, lnr, lmv, index,
763                                       LSLF_BAD_INDEX2, flags, depth);
764                 *lmv = lrls->lrls_lmv;
765                 lnr->lnr_fid = lrls->lrls_fid;
766                 index = shard_idx;
767                 if (rc != 0) {
768                         if (rc == -ERANGE || rc == -EEXIST)
769                                 goto conflict;
770
771                         break;
772                 }
773
774                 lslr->lslr_fid = *fid;
775                 lslr->lslr_flags = flags;
776                 if (lmv != NULL) {
777                         lslr->lslr_stripe_count = lmv->lmv_stripe_count;
778                         lslr->lslr_index = lmv->lmv_master_mdt_index;
779                         lslr->lslr_hash_type = lmv->lmv_hash_type;
780
781                         if (flags == LSLF_NONE &&
782                             llmv->ll_hash_type == LMV_HASH_TYPE_UNKNOWN &&
783                             lmv_is_known_hash_type(lmv->lmv_hash_type))
784                                 llmv->ll_hash_type = lmv->lmv_hash_type;
785
786                         if (flags == LSLF_NONE &&
787                             lslr->lslr_stripe_count <= LFSCK_LMV_MAX_STRIPES &&
788                             llmv->ll_max_stripe_count < lslr->lslr_stripe_count)
789                                 llmv->ll_max_stripe_count =
790                                                         lslr->lslr_stripe_count;
791                 } else {
792                         lslr->lslr_stripe_count = 0;
793                         lslr->lslr_index = 0;
794                         lslr->lslr_hash_type = 0;
795                 }
796
797                 break;
798
799 conflict:
800                 switch (flags) {
801                 case LSLF_NONE:
802                         /* The two 'valid' name entries claims the same
803                          * index, the LFSCK cannot distinguish which one
804                          * is correct. Then remove the master LMV EA to
805                          * make all shards to be visible to client, and
806                          * mark the master MDT-object as read-only. The
807                          * administrator can handle the conflict with
808                          * more human knowledge. */
809                         rc = lfsck_remove_lmv(env, com, lnr);
810                         break;
811                 case LSLF_BAD_INDEX2:
812                         GOTO(out, rc = -EEXIST);
813                 case LSLF_NO_LMVEA:
814                         goto no_lmvea;
815                 case LSLF_DANGLING:
816                         /* Remove the current dangling name entry. */
817                         rc = lfsck_remove_dirent(env, com, dir, fid, index);
818                         break;
819                 case LSLF_BAD_INDEX1:
820                         index = lmv->lmv_master_mdt_index;
821                         lmv->lmv_master_mdt_index = shard_idx;
822                         /* The name entry claims an index that is conflict
823                          * with a valid existing name entry, then try the
824                          * index in the lmv recursively. */
825                         rc = lfsck_record_lmv(env, com, lnr, lmv, index,
826                                 LSLF_BAD_INDEX2, lslr->lslr_flags, depth);
827                         lmv->lmv_master_mdt_index = index;
828                         if (rc == -ERANGE || rc == -EEXIST)
829                                 /* The index in the lmv is invalid or
830                                  * also conflict with other. Then we do
831                                  * not know how to resolve the conflict.
832                                  * We will handle it as handle the case
833                                  * of 'LSLF_NONE' vs 'LSLF_NONE'. */
834                                 rc = lfsck_remove_lmv(env, com, lnr);
835
836                         break;
837                 }
838
839                 break;
840         }
841         default:
842                 break;
843         }
844
845         if (rc < 0)
846                 llmv->ll_failed = 1;
847
848         GOTO(out, rc);
849
850 out:
851         (*depth)--;
852
853         return rc > 0 ? 0 : rc;
854 }
855
856 int lfsck_read_stripe_lmv(const struct lu_env *env, struct dt_object *obj,
857                           struct lmv_mds_md_v1 *lmv)
858 {
859         struct dt_object *bottom;
860         int               rc;
861
862         /* Currently, we only store the LMV header on disk. It is the LOD's
863          * duty to iterate the master MDT-object's directory to compose the
864          * integrated LMV EA. But here, we only want to load the LMV header,
865          * so we need to bypass LOD to avoid unnecessary iteration in LOD. */
866         bottom = lu2dt(container_of0(obj->do_lu.lo_header->loh_layers.prev,
867                                      struct lu_object, lo_linkage));
868         if (unlikely(bottom == NULL))
869                 return -ENOENT;
870
871         dt_read_lock(env, bottom, 0);
872         rc = dt_xattr_get(env, bottom, lfsck_buf_get(env, lmv, sizeof(*lmv)),
873                           XATTR_NAME_LMV, BYPASS_CAPA);
874         dt_read_unlock(env, bottom);
875         if (rc != sizeof(*lmv))
876                 return rc > 0 ? -EINVAL : rc;
877
878         lfsck_lmv_header_le_to_cpu(lmv, lmv);
879         if ((lmv->lmv_magic == LMV_MAGIC &&
880              !(lmv->lmv_hash_type & LMV_HASH_FLAG_MIGRATION)) ||
881             (lmv->lmv_magic == LMV_MAGIC_STRIPE &&
882              !(lmv->lmv_hash_type & LMV_HASH_FLAG_DEAD)))
883                 return 0;
884
885         return -ENODATA;
886 }
887
888 /**
889  * Parse the shard's index from the given shard name.
890  *
891  * The valid shard name/type should be:
892  * 1) The type must be S_IFDIR
893  * 2) The name should be $FID:$index
894  * 3) the index should within valid range.
895  *
896  * \param[in] env       pointer to the thread context
897  * \param[in] name      the shard name
898  * \param[in] namelen   the name length
899  * \param[in] type      the entry's type
900  * \param[in] fid       the entry's FID
901  *
902  * \retval              zero or positive number for the index from the name
903  * \retval              negative error number on failure
904  */
905 int lfsck_shard_name_to_index(const struct lu_env *env, const char *name,
906                               int namelen, __u16 type, const struct lu_fid *fid)
907 {
908         char    *name2  = lfsck_env_info(env)->lti_tmpbuf2;
909         int      len;
910         int      idx    = 0;
911
912         if (!S_ISDIR(type))
913                 return -ENOTDIR;
914
915         LASSERT(name != name2);
916
917         len = snprintf(name2, sizeof(lfsck_env_info(env)->lti_tmpbuf2),
918                        DFID":", PFID(fid));
919         if (namelen < len + 1 || memcmp(name, name2, len) != 0)
920                 return -EINVAL;
921
922         do {
923                 if (!isdigit(name[len]))
924                         return -EINVAL;
925
926                 idx = idx * 10 + name[len++] - '0';
927         } while (len < namelen);
928
929         if (idx >= LFSCK_LMV_MAX_STRIPES)
930                 return -EINVAL;
931
932         return idx;
933 }
934
935 bool lfsck_is_valid_slave_name_entry(const struct lu_env *env,
936                                      struct lfsck_lmv *llmv,
937                                      const char *name, int namelen)
938 {
939         struct lmv_mds_md_v1    *lmv;
940         int                      idx;
941
942         if (llmv == NULL || !llmv->ll_lmv_slave || !llmv->ll_lmv_verified)
943                 return true;
944
945         lmv = &llmv->ll_lmv;
946         idx = lmv_name_to_stripe_index(lmv->lmv_hash_type,
947                                        lmv->lmv_stripe_count,
948                                        name, namelen);
949         if (unlikely(idx != lmv->lmv_master_mdt_index))
950                 return false;
951
952         return true;
953 }
954
955 /**
956  * Check whether the given name is a valid entry under the @parent.
957  *
958  * If the @parent is a striped directory then the @child should one
959  * shard of the striped directory, its name should be $FID:$index.
960  *
961  * If the @parent is a shard of a striped directory, then the name hash
962  * should match the MDT, otherwise it is invalid.
963  *
964  * \param[in] env       pointer to the thread context
965  * \param[in] parent    the parent directory
966  * \param[in] child     the child object to be checked
967  * \param[in] cname     the name for the @child in the parent directory
968  *
969  * \retval              positive number for invalid name entry
970  * \retval              0 if the name is valid or uncertain
971  * \retval              negative error number on failure
972  */
973 int lfsck_namespace_check_name(const struct lu_env *env,
974                                struct dt_object *parent,
975                                struct dt_object *child,
976                                const struct lu_name *cname)
977 {
978         struct lmv_mds_md_v1    *lmv = &lfsck_env_info(env)->lti_lmv;
979         int                      idx;
980         int                      rc;
981
982         rc = lfsck_read_stripe_lmv(env, parent, lmv);
983         if (rc != 0)
984                 RETURN(rc == -ENODATA ? 0 : rc);
985
986         if (lmv->lmv_magic == LMV_MAGIC_STRIPE) {
987                 if (!lfsck_is_valid_slave_lmv(lmv))
988                         return 0;
989
990                 idx = lmv_name_to_stripe_index(lmv->lmv_hash_type,
991                                                lmv->lmv_stripe_count,
992                                                cname->ln_name,
993                                                cname->ln_namelen);
994                 if (unlikely(idx != lmv->lmv_master_mdt_index))
995                         return 1;
996         } else if (lfsck_shard_name_to_index(env, cname->ln_name,
997                         cname->ln_namelen, lfsck_object_type(child),
998                         lfsck_dto2fid(child)) < 0) {
999                 return 1;
1000         }
1001
1002         return 0;
1003 }
1004
1005 /**
1006  * Update the object's LMV EA with the given @lmv.
1007  *
1008  * \param[in] env       pointer to the thread context
1009  * \param[in] com       pointer to the lfsck component
1010  * \param[in] obj       pointer to the object which LMV EA will be updated
1011  * \param[in] lmv       pointer to buffer holding the new LMV EA
1012  * \param[in] locked    whether the caller has held ldlm lock on the @obj or not
1013  *
1014  * \retval              positive number for nothing to be done
1015  * \retval              zero if updated successfully
1016  * \retval              negative error number on failure
1017  */
1018 int lfsck_namespace_update_lmv(const struct lu_env *env,
1019                                struct lfsck_component *com,
1020                                struct dt_object *obj,
1021                                struct lmv_mds_md_v1 *lmv, bool locked)
1022 {
1023         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1024         struct lmv_mds_md_v1            *lmv4   = &info->lti_lmv4;
1025         struct lu_buf                   *buf    = &info->lti_buf;
1026         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1027         struct dt_device                *dev    = lfsck_obj2dt_dev(obj);
1028         struct thandle                  *th     = NULL;
1029         struct lustre_handle             lh     = { 0 };
1030         int                              rc     = 0;
1031         int                              rc1    = 0;
1032         ENTRY;
1033
1034         LASSERT(lmv4 != lmv);
1035
1036         lfsck_lmv_header_cpu_to_le(lmv4, lmv);
1037         lfsck_buf_init(buf, lmv4, sizeof(*lmv4));
1038
1039         if (!locked) {
1040                 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
1041                                       MDS_INODELOCK_UPDATE |
1042                                       MDS_INODELOCK_XATTR, LCK_EX);
1043                 if (rc != 0)
1044                         GOTO(log, rc);
1045         }
1046
1047         th = dt_trans_create(env, dev);
1048         if (IS_ERR(th))
1049                 GOTO(log, rc = PTR_ERR(th));
1050
1051         /* For remote updating LMV EA, there will be further LFSCK action on
1052          * remote MDT after the updating, so update the LMV EA synchronously. */
1053         if (dt_object_remote(obj))
1054                 th->th_sync = 1;
1055
1056         rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LMV, 0, th);
1057         if (rc != 0)
1058                 GOTO(stop, rc);
1059
1060         rc = dt_trans_start_local(env, dev, th);
1061         if (rc != 0)
1062                 GOTO(stop, rc);
1063
1064         dt_write_lock(env, obj, 0);
1065         if (unlikely(lfsck_is_dead_obj(obj)))
1066                 GOTO(unlock, rc = 1);
1067
1068         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1069                 GOTO(unlock, rc = 0);
1070
1071         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LMV, 0, th, BYPASS_CAPA);
1072
1073         GOTO(unlock, rc);
1074
1075 unlock:
1076         dt_write_unlock(env, obj);
1077
1078 stop:
1079         rc1 = dt_trans_stop(env, dev, th);
1080         if (rc == 0)
1081                 rc = rc1;
1082
1083 log:
1084         lfsck_ibits_unlock(&lh, LCK_EX);
1085         CDEBUG(D_LFSCK, "%s: namespace LFSCK updated the %s LMV EA "
1086                "for the object "DFID": rc = %d\n",
1087                lfsck_lfsck2name(lfsck),
1088                lmv->lmv_magic == LMV_MAGIC ? "master" : "slave",
1089                PFID(lfsck_dto2fid(obj)), rc);
1090
1091         return rc;
1092 }
1093
1094 /**
1095  * Check whether allow to re-genereate the lost master LMV EA.
1096  *
1097  * If the master MDT-object of the striped directory lost its master LMV EA,
1098  * then before the LFSCK repaired the striped directory, some ones may have
1099  * created some objects (that are not normal shards of the striped directory)
1100  * under the master MDT-object. If such case happend, then the LFSCK cannot
1101  * re-generate the lost master LMV EA to keep those objects to be visible to
1102  * client.
1103  *
1104  * \param[in] env       pointer to the thread context
1105  * \param[in] com       pointer to the lfsck component
1106  * \param[in] obj       pointer to the master MDT-object to be checked
1107  * \param[in] cfid      the shard's FID used for verification
1108  * \param[in] cidx      the shard's index used for verification
1109  *
1110  * \retval              positive number if not allow to re-generate LMV EA
1111  * \retval              zero if allow to re-generate LMV EA
1112  * \retval              negative error number on failure
1113  */
1114 static int lfsck_allow_regenerate_master_lmv(const struct lu_env *env,
1115                                              struct lfsck_component *com,
1116                                              struct dt_object *obj,
1117                                              const struct lu_fid *cfid,
1118                                              __u32 cidx)
1119 {
1120         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1121         struct lu_fid                   *tfid   = &info->lti_fid3;
1122         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1123         struct lu_dirent                *ent    =
1124                         (struct lu_dirent *)info->lti_key;
1125         const struct dt_it_ops          *iops;
1126         struct dt_it                    *di;
1127         __u64                            cookie;
1128         __u32                            args;
1129         int                              rc;
1130         __u16                            type;
1131         ENTRY;
1132
1133         if (unlikely(!dt_try_as_dir(env, obj)))
1134                 RETURN(-ENOTDIR);
1135
1136         /* Check whether the shard and the master MDT-object matches or not. */
1137         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf), DFID":%u",
1138                  PFID(cfid), cidx);
1139         rc = dt_lookup(env, obj, (struct dt_rec *)tfid,
1140                        (const struct dt_key *)info->lti_tmpbuf, BYPASS_CAPA);
1141         if (rc != 0)
1142                 RETURN(rc);
1143
1144         if (!lu_fid_eq(tfid, cfid))
1145                 RETURN(-ENOENT);
1146
1147         args = lfsck->li_args_dir & ~(LUDA_VERIFY | LUDA_VERIFY_DRYRUN);
1148         iops = &obj->do_index_ops->dio_it;
1149         di = iops->init(env, obj, args, BYPASS_CAPA);
1150         if (IS_ERR(di))
1151                 RETURN(PTR_ERR(di));
1152
1153         rc = iops->load(env, di, 0);
1154         if (rc == 0)
1155                 rc = iops->next(env, di);
1156         else if (rc > 0)
1157                 rc = 0;
1158
1159         if (rc != 0)
1160                 GOTO(out, rc);
1161
1162         do {
1163                 rc = iops->rec(env, di, (struct dt_rec *)ent, args);
1164                 if (rc == 0)
1165                         rc = lfsck_unpack_ent(ent, &cookie, &type);
1166
1167                 if (rc != 0)
1168                         GOTO(out, rc);
1169
1170                 /* skip dot and dotdot entries */
1171                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1172                         goto next;
1173
1174                 /* If the subdir name does not match the shard name rule, then
1175                  * it is quite possible that it is NOT a shard, but created by
1176                  * someone after the master MDT-object lost the master LMV EA.
1177                  * But it is also possible that the subdir name entry crashed,
1178                  * under such double failure cases, the LFSCK cannot know how
1179                  * to repair the inconsistency. For data safe, the LFSCK will
1180                  * mark the master MDT-object as read-only. The administrator
1181                  * can fix the bad shard name manually, then run LFSCK again.
1182                  *
1183                  * XXX: If the subdir name matches the shard name rule, but it
1184                  *      is not a real shard of the striped directory, instead,
1185                  *      it was created by someone after the master MDT-object
1186                  *      lost the LMV EA, then re-generating the master LMV EA
1187                  *      will cause such subdir to be invisible to client, and
1188                  *      if its index occupies some lost shard index, then the
1189                  *      LFSCK will use it to replace the bad shard, and cause
1190                  *      the subdir (itself) to be invisible for ever. */
1191                 if (lfsck_shard_name_to_index(env, ent->lde_name,
1192                                 ent->lde_namelen, type, &ent->lde_fid) < 0)
1193                         GOTO(out, rc = 1);
1194
1195 next:
1196                 rc = iops->next(env, di);
1197         } while (rc == 0);
1198
1199         GOTO(out, rc = 0);
1200
1201 out:
1202         iops->put(env, di);
1203         iops->fini(env, di);
1204
1205         return rc;
1206 }
1207
1208 /**
1209  * Notify remote LFSCK instance that the object's LMV EA has been updated.
1210  *
1211  * \param[in] env       pointer to the thread context
1212  * \param[in] com       pointer to the lfsck component
1213  * \param[in] obj       pointer to the object on which the LMV EA will be set
1214  * \param[in] event     indicate either master or slave LMV EA has been updated
1215  * \param[in] flags     indicate which element(s) in the LMV EA has been updated
1216  * \param[in] index     the MDT index on which the LFSCK instance to be notified
1217  *
1218  * \retval              positive number if nothing to be done
1219  * \retval              zero for succeed
1220  * \retval              negative error number on failure
1221  */
1222 static int lfsck_namespace_notify_lmv_remote(const struct lu_env *env,
1223                                              struct lfsck_component *com,
1224                                              struct dt_object *obj,
1225                                              __u32 event, __u32 flags,
1226                                              __u32 index)
1227 {
1228         struct lfsck_request            *lr     = &lfsck_env_info(env)->lti_lr;
1229         const struct lu_fid             *fid    = lfsck_dto2fid(obj);
1230         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1231         struct lfsck_tgt_desc           *ltd    = NULL;
1232         struct ptlrpc_request           *req    = NULL;
1233         int                              rc;
1234         ENTRY;
1235
1236         ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, index);
1237         if (ltd == NULL)
1238                 GOTO(out, rc = -ENODEV);
1239
1240         req = ptlrpc_request_alloc(class_exp2cliimp(ltd->ltd_exp),
1241                                    &RQF_LFSCK_NOTIFY);
1242         if (req == NULL)
1243                 GOTO(out, rc = -ENOMEM);
1244
1245         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
1246         if (rc != 0) {
1247                 ptlrpc_request_free(req);
1248
1249                 GOTO(out, rc);
1250         }
1251
1252         lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1253         memset(lr, 0, sizeof(*lr));
1254         lr->lr_event = event;
1255         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1256         lr->lr_active = LFSCK_TYPE_NAMESPACE;
1257         lr->lr_fid = *fid;
1258         lr->lr_flags = flags;
1259
1260         ptlrpc_request_set_replen(req);
1261         rc = ptlrpc_queue_wait(req);
1262         ptlrpc_req_finished(req);
1263
1264         GOTO(out, rc = (rc == -ENOENT ? 1 : rc));
1265
1266 out:
1267         CDEBUG(D_LFSCK, "%s: namespace LFSCK notify LMV EA updated for the "
1268                "object "DFID" on MDT %x remotely with event %u, flags %u: "
1269                "rc = %d\n", lfsck_lfsck2name(lfsck), PFID(fid), index,
1270                event, flags, rc);
1271
1272         if (ltd != NULL)
1273                 lfsck_tgt_put(ltd);
1274
1275         return rc;
1276 }
1277
1278 /**
1279  * Generate request for local LFSCK instance to rescan the striped directory.
1280  *
1281  * \param[in] env       pointer to the thread context
1282  * \param[in] com       pointer to the lfsck component
1283  * \param[in] obj       pointer to the striped directory to be rescanned
1284  *
1285  * \retval              positive number if nothing to be done
1286  * \retval              zero for succeed
1287  * \retval              negative error number on failure
1288  */
1289 int lfsck_namespace_notify_lmv_master_local(const struct lu_env *env,
1290                                             struct lfsck_component *com,
1291                                             struct dt_object *obj)
1292 {
1293         struct lfsck_instance      *lfsck = com->lc_lfsck;
1294         struct lfsck_namespace     *ns    = com->lc_file_ram;
1295         struct lmv_mds_md_v1       *lmv4  = &lfsck_env_info(env)->lti_lmv4;
1296         struct lfsck_lmv_unit      *llu;
1297         struct lfsck_lmv           *llmv;
1298         struct lfsck_slave_lmv_rec *lslr;
1299         int                         count = 0;
1300         int                         rc;
1301         ENTRY;
1302
1303         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1304                 RETURN(0);
1305
1306         rc = lfsck_read_stripe_lmv(env, obj, lmv4);
1307         if (rc != 0)
1308                 RETURN(rc);
1309
1310         OBD_ALLOC_PTR(llu);
1311         if (unlikely(llu == NULL))
1312                 RETURN(-ENOMEM);
1313
1314         if (lmv4->lmv_stripe_count < 1)
1315                 count = LFSCK_LMV_DEF_STRIPES;
1316         else if (lmv4->lmv_stripe_count > LFSCK_LMV_MAX_STRIPES)
1317                 count = LFSCK_LMV_MAX_STRIPES;
1318         else
1319                 count = lmv4->lmv_stripe_count;
1320
1321         OBD_ALLOC_LARGE(lslr, sizeof(struct lfsck_slave_lmv_rec) * count);
1322         if (lslr == NULL) {
1323                 OBD_FREE_PTR(llu);
1324
1325                 RETURN(-ENOMEM);
1326         }
1327
1328         INIT_LIST_HEAD(&llu->llu_link);
1329         llu->llu_lfsck = lfsck;
1330         llu->llu_obj = lfsck_object_get(obj);
1331         llmv = &llu->llu_lmv;
1332         llmv->ll_lmv_master = 1;
1333         llmv->ll_inline = 1;
1334         atomic_set(&llmv->ll_ref, 1);
1335         llmv->ll_stripes_allocated = count;
1336         llmv->ll_hash_type = LMV_HASH_TYPE_UNKNOWN;
1337         llmv->ll_lslr = lslr;
1338         llmv->ll_lmv = *lmv4;
1339
1340         down_write(&com->lc_sem);
1341         if (ns->ln_status != LS_SCANNING_PHASE1 &&
1342             ns->ln_status != LS_SCANNING_PHASE2) {
1343                 ns->ln_striped_dirs_skipped++;
1344                 up_write(&com->lc_sem);
1345                 lfsck_lmv_put(env, llmv);
1346         } else {
1347                 ns->ln_striped_dirs_repaired++;
1348                 spin_lock(&lfsck->li_lock);
1349                 list_add_tail(&llu->llu_link, &lfsck->li_list_lmv);
1350                 spin_unlock(&lfsck->li_lock);
1351                 up_write(&com->lc_sem);
1352         }
1353
1354         RETURN(0);
1355 }
1356
1357 /**
1358  * Set master LMV EA for the specified striped directory.
1359  *
1360  * First, if the master MDT-object of a striped directory lost its LMV EA,
1361  * then there may be some users have created some files under the master
1362  * MDT-object directly. Under such case, the LFSCK cannot re-generate LMV
1363  * EA for the master MDT-object, because we should keep the existing files
1364  * to be visible to client. Then the LFSCK will mark the striped directory
1365  * as read-only and keep it there to be handled by administrator manually.
1366  *
1367  * If nobody has created files under the master MDT-object of the striped
1368  * directory, then we will set the master LMV EA and generate a new rescan
1369  * (the striped directory) request that will be handled later by the LFSCK
1370  * instance on the MDT later.
1371  *
1372  * \param[in] env       pointer to the thread context
1373  * \param[in] com       pointer to the lfsck component
1374  * \param[in] dir       pointer to the object on which the LMV EA will be set
1375  * \param[in] lmv       pointer to the buffer holding the new LMV EA
1376  * \param[in] cfid      the shard's FID used for verification
1377  * \param[in] cidx      the shard's index used for verification
1378  * \param[in] flags     to indicate which element(s) in the LMV EA will be set
1379  *
1380  * \retval              positive number if nothing to be done
1381  * \retval              zero for succeed
1382  * \retval              negative error number on failure
1383  */
1384 static int lfsck_namespace_set_lmv_master(const struct lu_env *env,
1385                                           struct lfsck_component *com,
1386                                           struct dt_object *dir,
1387                                           struct lmv_mds_md_v1 *lmv,
1388                                           const struct lu_fid *cfid,
1389                                           __u32 cidx, __u32 flags)
1390 {
1391         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1392         struct lmv_mds_md_v1            *lmv3   = &info->lti_lmv3;
1393         struct lu_seq_range             *range  = &info->lti_range;
1394         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1395         struct seq_server_site          *ss     =
1396                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
1397         struct dt_object                *obj;
1398         struct lustre_handle             lh     = { 0 };
1399         int                              pidx   = -1;
1400         int                              rc     = 0;
1401         ENTRY;
1402
1403         /* Find the bottom object to bypass LOD when set LMV EA. */
1404         obj = lu2dt(container_of0(dir->do_lu.lo_header->loh_layers.prev,
1405                                   struct lu_object, lo_linkage));
1406         if (unlikely(obj == NULL))
1407                 RETURN(-ENOENT);
1408
1409         fld_range_set_mdt(range);
1410         rc = fld_server_lookup(env, ss->ss_server_fld,
1411                                fid_seq(lfsck_dto2fid(obj)), range);
1412         if (rc != 0)
1413                 GOTO(log, rc);
1414
1415         pidx = range->lsr_index;
1416         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
1417                               MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1418                               LCK_EX);
1419         if (rc != 0)
1420                 GOTO(log, rc);
1421
1422         rc = lfsck_read_stripe_lmv(env, obj, lmv3);
1423         if (rc == -ENODATA) {
1424                 if (!(flags & LEF_SET_LMV_ALL))
1425                         GOTO(log, rc);
1426
1427                 *lmv3 = *lmv;
1428         } else if (rc == 0) {
1429                 if (flags & LEF_SET_LMV_ALL)
1430                         GOTO(log, rc = 1);
1431
1432                 if (flags & LEF_SET_LMV_HASH)
1433                         lmv3->lmv_hash_type = lmv->lmv_hash_type;
1434         } else {
1435                 GOTO(log, rc);
1436         }
1437
1438         lmv3->lmv_magic = LMV_MAGIC;
1439         lmv3->lmv_master_mdt_index = pidx;
1440
1441         if (flags & LEF_SET_LMV_ALL) {
1442                 rc = lfsck_allow_regenerate_master_lmv(env, com, obj,
1443                                                        cfid, cidx);
1444                 if (rc > 0) {
1445                         rc = lfsck_disable_master_lmv(env, com, obj, false);
1446
1447                         GOTO(log, rc = (rc == 0 ? 1 : rc));
1448                 }
1449
1450                 if (rc < 0)
1451                         GOTO(log, rc);
1452
1453                 /* To indicate that the master has ever lost LMV EA. */
1454                 lmv3->lmv_hash_type |= LMV_HASH_FLAG_LOST_LMV;
1455         }
1456
1457         rc = lfsck_namespace_update_lmv(env, com, obj, lmv3, true);
1458         if (rc == 0 && flags & LEF_SET_LMV_ALL) {
1459                 if (dt_object_remote(obj))
1460                         rc = lfsck_namespace_notify_lmv_remote(env, com, obj,
1461                                                 LE_SET_LMV_MASTER, 0, pidx);
1462                 else
1463                         rc = lfsck_namespace_notify_lmv_master_local(env, com,
1464                                                                      obj);
1465         }
1466
1467         GOTO(log, rc);
1468
1469 log:
1470         lfsck_ibits_unlock(&lh, LCK_EX);
1471         CDEBUG(D_LFSCK, "%s: namespace LFSCK set master LMV EA for the object "
1472                DFID" on the %s MDT %d, flags %x: rc = %d\n",
1473                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)),
1474                dt_object_remote(obj) ? "remote" : "local", pidx, flags, rc);
1475
1476         if (rc <= 0) {
1477                 struct lfsck_namespace *ns = com->lc_file_ram;
1478
1479                 ns->ln_flags |= LF_INCONSISTENT;
1480         }
1481
1482         return rc;
1483 }
1484
1485 /**
1486  * Repair the bad name hash.
1487  *
1488  * If the name hash of some name entry under the striped directory does not
1489  * match the shard of the striped directory, then the LFSCK will repair the
1490  * inconsistency. Ideally, the LFSCK should migrate the name entry from the
1491  * current MDT to the right MDT (another one), but before the async commit
1492  * finished, the LFSCK will change the striped directory's hash type as
1493  * LMV_HASH_TYPE_UNKNOWN and mark the lmv flags as LMV_HASH_FLAG_BAD_TYPE.
1494  *
1495  * \param[in] env       pointer to the thread context
1496  * \param[in] com       pointer to the lfsck component
1497  * \param[in] shard     pointer to the shard of the striped directory that
1498  *                      contains the bad name entry
1499  * \param[in] llmv      pointer to lfsck LMV EA structure
1500  * \param[in] name      the name of the bad name hash
1501  *
1502  * \retval              positive number if nothing to be done
1503  * \retval              zero for succeed
1504  * \retval              negative error number on failure
1505  */
1506 int lfsck_namespace_repair_bad_name_hash(const struct lu_env *env,
1507                                          struct lfsck_component *com,
1508                                          struct dt_object *shard,
1509                                          struct lfsck_lmv *llmv,
1510                                          const char *name)
1511 {
1512         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1513         struct lu_fid                   *pfid   = &info->lti_fid3;
1514         struct lmv_mds_md_v1            *lmv2   = &info->lti_lmv2;
1515         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1516         struct dt_object                *parent = NULL;
1517         int                              rc     = 0;
1518         ENTRY;
1519
1520         rc = dt_lookup(env, shard, (struct dt_rec *)pfid,
1521                        (const struct dt_key *)dotdot, BYPASS_CAPA);
1522         if (rc != 0 || !fid_is_sane(pfid))
1523                 GOTO(log, rc);
1524
1525         parent = lfsck_object_find_bottom(env, lfsck, pfid);
1526         if (IS_ERR(parent))
1527                 GOTO(log, rc = PTR_ERR(parent));
1528
1529         *lmv2 = llmv->ll_lmv;
1530         lmv2->lmv_hash_type = LMV_HASH_TYPE_UNKNOWN | LMV_HASH_FLAG_BAD_TYPE;
1531         rc = lfsck_namespace_set_lmv_master(env, com, parent, lmv2,
1532                                             lfsck_dto2fid(shard),
1533                                             llmv->ll_lmv.lmv_master_mdt_index,
1534                                             LEF_SET_LMV_HASH);
1535
1536         GOTO(log, rc);
1537
1538 log:
1539         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found bad name hash "
1540                "on the MDT %x, parent "DFID", name %s, shard_%x "DFID
1541                ": rc = %d\n",
1542                lfsck_lfsck2name(lfsck), lfsck_dev_idx(lfsck->li_bottom),
1543                PFID(pfid), name, llmv->ll_lmv.lmv_master_mdt_index,
1544                PFID(lfsck_dto2fid(shard)), rc);
1545
1546         if (parent != NULL && !IS_ERR(parent))
1547                 lfsck_object_put(env, parent);
1548
1549         return rc;
1550 }
1551
1552 /**
1553  * Scan the shard of a striped directory for name hash verification.
1554  *
1555  * During the first-stage scanning, if the LFSCK cannot make sure whether
1556  * the shard of a stripe directory contains valid slave LMV EA or not, then
1557  * it will skip the name hash verification for this shard temporarily, and
1558  * record the shard's FID in the LFSCK tracing file. As the LFSCK processing,
1559  * the slave LMV EA may has been verified/fixed by LFSCK instance on master.
1560  * Then in the second-stage scanning, the shard will be re-scanned, and for
1561  * every name entry under the shard, the name hash will be verified, and for
1562  * unmatched name entry, the LFSCK will try to fix it.
1563  *
1564  * \param[in] env       pointer to the thread context
1565  * \param[in] com       pointer to the lfsck component
1566  * \param[in] child     pointer to the directory object to be handled
1567  *
1568  * \retval              positive number for scanning successfully
1569  * \retval              zero for the scanning is paused
1570  * \retval              negative error number on failure
1571  */
1572 int lfsck_namespace_scan_shard(const struct lu_env *env,
1573                                struct lfsck_component *com,
1574                                struct dt_object *child)
1575 {
1576         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1577         struct lmv_mds_md_v1            *lmv    = &info->lti_lmv;
1578         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1579         struct lfsck_namespace          *ns     = com->lc_file_ram;
1580         struct ptlrpc_thread            *thread = &lfsck->li_thread;
1581         struct lu_dirent                *ent    =
1582                         (struct lu_dirent *)info->lti_key;
1583         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
1584         struct lfsck_lmv                *llmv   = NULL;
1585         const struct dt_it_ops          *iops;
1586         struct dt_it                    *di;
1587         __u64                            cookie;
1588         __u32                            args;
1589         int                              rc;
1590         __u16                            type;
1591         ENTRY;
1592
1593         rc = lfsck_read_stripe_lmv(env, child, lmv);
1594         if (rc != 0)
1595                 RETURN(rc == -ENODATA ? 1 : rc);
1596
1597         if (lmv->lmv_magic != LMV_MAGIC_STRIPE)
1598                 RETURN(1);
1599
1600         if (unlikely(!dt_try_as_dir(env, child)))
1601                 RETURN(-ENOTDIR);
1602
1603         OBD_ALLOC_PTR(llmv);
1604         if (llmv == NULL)
1605                 RETURN(-ENOMEM);
1606
1607         llmv->ll_lmv_slave = 1;
1608         llmv->ll_lmv_verified = 1;
1609         llmv->ll_lmv = *lmv;
1610         atomic_set(&llmv->ll_ref, 1);
1611
1612         args = lfsck->li_args_dir & ~(LUDA_VERIFY | LUDA_VERIFY_DRYRUN);
1613         iops = &child->do_index_ops->dio_it;
1614         di = iops->init(env, child, args, BYPASS_CAPA);
1615         if (IS_ERR(di))
1616                 GOTO(out, rc = PTR_ERR(di));
1617
1618         rc = iops->load(env, di, 0);
1619         if (rc == 0)
1620                 rc = iops->next(env, di);
1621         else if (rc > 0)
1622                 rc = 0;
1623
1624         while (rc == 0) {
1625                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
1626                     cfs_fail_val > 0) {
1627                         struct l_wait_info lwi;
1628
1629                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
1630                                           NULL, NULL);
1631                         l_wait_event(thread->t_ctl_waitq,
1632                                      !thread_is_running(thread),
1633                                      &lwi);
1634
1635                         if (unlikely(!thread_is_running(thread)))
1636                                 GOTO(out, rc = 0);
1637                 }
1638
1639                 rc = iops->rec(env, di, (struct dt_rec *)ent, args);
1640                 if (rc == 0)
1641                         rc = lfsck_unpack_ent(ent, &cookie, &type);
1642
1643                 if (rc != 0) {
1644                         if (bk->lb_param & LPF_FAILOUT)
1645                                 GOTO(out, rc);
1646
1647                         goto next;
1648                 }
1649
1650                 /* skip dot and dotdot entries */
1651                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1652                         goto next;
1653
1654                 if (!lfsck_is_valid_slave_name_entry(env, llmv, ent->lde_name,
1655                                                      ent->lde_namelen)) {
1656                         ns->ln_flags |= LF_INCONSISTENT;
1657                         rc = lfsck_namespace_repair_bad_name_hash(env, com,
1658                                                 child, llmv, ent->lde_name);
1659                         if (rc >= 0)
1660                                 ns->ln_name_hash_repaired++;
1661                 }
1662
1663                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
1664                         GOTO(out, rc);
1665
1666                 /* Rate control. */
1667                 lfsck_control_speed(lfsck);
1668                 if (unlikely(!thread_is_running(thread)))
1669                         GOTO(out, rc = 0);
1670
1671                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
1672                         spin_lock(&lfsck->li_lock);
1673                         thread_set_flags(thread, SVC_STOPPING);
1674                         spin_unlock(&lfsck->li_lock);
1675
1676                         GOTO(out, rc = -EINVAL);
1677                 }
1678
1679 next:
1680                 rc = iops->next(env, di);
1681         }
1682
1683         GOTO(out, rc);
1684
1685 out:
1686         iops->put(env, di);
1687         iops->fini(env, di);
1688         lfsck_lmv_put(env, llmv);
1689
1690         return rc;
1691 }
1692
1693 /**
1694  * Verify the slave object's (of striped directory) LMV EA.
1695  *
1696  * For the slave object of a striped directory, before traversing the shard
1697  * the LFSCK will verify whether its slave LMV EA matches its parent's master
1698  * LMV EA or not.
1699  *
1700  * \param[in] env       pointer to the thread context
1701  * \param[in] com       pointer to the lfsck component
1702  * \param[in] obj       pointer to the object which LMV EA will be checked
1703  * \param[in] llmv      pointer to buffer holding the slave LMV EA
1704  *
1705  * \retval              zero for succeed
1706  * \retval              negative error number on failure
1707  */
1708 int lfsck_namespace_verify_stripe_slave(const struct lu_env *env,
1709                                         struct lfsck_component *com,
1710                                         struct dt_object *obj,
1711                                         struct lfsck_lmv *llmv)
1712 {
1713         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1714         char                            *name   = info->lti_key;
1715         char                            *name2;
1716         struct lu_fid                   *pfid   = &info->lti_fid3;
1717         struct lu_fid                   *tfid   = &info->lti_fid4;
1718         const struct lu_fid             *cfid   = lfsck_dto2fid(obj);
1719         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1720         struct lmv_mds_md_v1            *clmv   = &llmv->ll_lmv;
1721         struct lmv_mds_md_v1            *plmv   = &info->lti_lmv;
1722         struct dt_object                *parent = NULL;
1723         int                              rc     = 0;
1724         ENTRY;
1725
1726         if (!lfsck_is_valid_slave_lmv(clmv)) {
1727                 rc = lfsck_namespace_trace_update(env, com, cfid,
1728                                         LNTF_UNCERTAIN_LMV, true);
1729
1730                 GOTO(out, rc);
1731         }
1732
1733         rc = dt_lookup(env, obj, (struct dt_rec *)pfid,
1734                        (const struct dt_key *)dotdot, BYPASS_CAPA);
1735         if (rc != 0 || !fid_is_sane(pfid)) {
1736                 rc = lfsck_namespace_trace_update(env, com, cfid,
1737                                         LNTF_UNCERTAIN_LMV, true);
1738
1739                 GOTO(out, rc);
1740         }
1741
1742         parent = lfsck_object_find(env, lfsck, pfid);
1743         if (IS_ERR(parent)) {
1744                 rc = lfsck_namespace_trace_update(env, com, cfid,
1745                                         LNTF_UNCERTAIN_LMV, true);
1746
1747                 GOTO(out, rc);
1748         }
1749
1750         rc = lfsck_read_stripe_lmv(env, parent, plmv);
1751         if (rc != 0) {
1752                 int rc1;
1753
1754                 /* If the parent has no LMV EA, then it maybe because:
1755                  * 1) The parent lost the LMV EA.
1756                  * 2) The child claims a wrong (slave) LMV EA. */
1757                 if (rc == -ENODATA)
1758                         rc = lfsck_namespace_set_lmv_master(env, com, parent,
1759                                         clmv, cfid, clmv->lmv_master_mdt_index,
1760                                         LEF_SET_LMV_ALL);
1761                 else
1762                         rc = 0;
1763
1764                 rc1 = lfsck_namespace_trace_update(env, com, cfid,
1765                                                    LNTF_UNCERTAIN_LMV, true);
1766
1767                 GOTO(out, rc = (rc < 0 ? rc : rc1));
1768         }
1769
1770         /* Unmatched magic or stripe count. */
1771         if (unlikely(plmv->lmv_magic != LMV_MAGIC ||
1772                      plmv->lmv_stripe_count != clmv->lmv_stripe_count)) {
1773                 rc = lfsck_namespace_trace_update(env, com, cfid,
1774                                                   LNTF_UNCERTAIN_LMV, true);
1775
1776                 GOTO(out, rc);
1777         }
1778
1779         /* If the master hash type has been set as LMV_HASH_TYPE_UNKNOWN,
1780          * then the slave hash type is not important. */
1781         if ((plmv->lmv_hash_type & LMV_HASH_TYPE_MASK) ==
1782             LMV_HASH_TYPE_UNKNOWN &&
1783             plmv->lmv_hash_type & LMV_HASH_FLAG_BAD_TYPE)
1784                 GOTO(out, rc = 0);
1785
1786         /* Unmatched hash type. */
1787         if (unlikely((plmv->lmv_hash_type & LMV_HASH_TYPE_MASK) !=
1788                      (clmv->lmv_hash_type & LMV_HASH_TYPE_MASK))) {
1789                 rc = lfsck_namespace_trace_update(env, com, cfid,
1790                                                   LNTF_UNCERTAIN_LMV, true);
1791
1792                 GOTO(out, rc);
1793         }
1794
1795         snprintf(info->lti_tmpbuf2, sizeof(info->lti_tmpbuf2), DFID":%u",
1796                  PFID(cfid), clmv->lmv_master_mdt_index);
1797         name2 = info->lti_tmpbuf2;
1798
1799         rc = lfsck_links_get_first(env, obj, name, tfid);
1800         if (rc == 0 && strcmp(name, name2) == 0 && lu_fid_eq(pfid, tfid)) {
1801                 llmv->ll_lmv_verified = 1;
1802
1803                 GOTO(out, rc);
1804         }
1805
1806         rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
1807                        (const struct dt_key *)name2, BYPASS_CAPA);
1808         if (rc != 0 || !lu_fid_eq(cfid, tfid))
1809                 rc = lfsck_namespace_trace_update(env, com, cfid,
1810                                                   LNTF_UNCERTAIN_LMV, true);
1811         else
1812                 llmv->ll_lmv_verified = 1;
1813
1814         GOTO(out, rc);
1815
1816 out:
1817         if (parent != NULL && !IS_ERR(parent))
1818                 lfsck_object_put(env, parent);
1819
1820         return rc;
1821 }
1822
1823 /**
1824  * Double scan the striped directory or the shard.
1825  *
1826  * All the shards' under the given striped directory or its shard have
1827  * been scanned, the LFSCK has got the global knownledge about the LMV
1828  * EA consistency.
1829  *
1830  * If the target is one shard of a striped directory, then only needs to
1831  * update related tracing file.
1832  *
1833  * If the target is the master MDT-object of a striped directory, then the
1834  * LFSCK will make the decision about whether the master LMV EA is invalid
1835  * or not, and repair it if inconsistenct; for every shard of the striped
1836  * directory, whether the slave LMV EA is invalid or not, and repair it if
1837  * inconsistent.
1838  *
1839  * \param[in] env       pointer to the thread context
1840  * \param[in] com       pointer to the lfsck component
1841  * \param[in] lnr       pointer to the namespace request that contains the
1842  *                      striped directory or the shard
1843  *
1844  * \retval              zero for succeed
1845  * \retval              negative error number on failure
1846  */
1847 int lfsck_namespace_striped_dir_rescan(const struct lu_env *env,
1848                                        struct lfsck_component *com,
1849                                        struct lfsck_namespace_req *lnr)
1850 {
1851         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1852         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1853         struct lfsck_namespace          *ns     = com->lc_file_ram;
1854         struct lfsck_lmv                *llmv   = lnr->lnr_lmv;
1855         struct lmv_mds_md_v1            *lmv    = &llmv->ll_lmv;
1856         struct lmv_mds_md_v1            *lmv2   = &info->lti_lmv2;
1857         struct dt_object                *dir    = lnr->lnr_obj;
1858         const struct lu_fid             *pfid   = lfsck_dto2fid(dir);
1859         struct lu_seq_range             *range  = &info->lti_range;
1860         struct seq_server_site          *ss     =
1861                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
1862         __u32                            stripe_count;
1863         __u32                            hash_type;
1864         int                              rc     = 0;
1865         int                              i;
1866         ENTRY;
1867
1868         if (llmv->ll_lmv_slave) {
1869                 if (llmv->ll_lmv_verified) {
1870                         ns->ln_striped_shards_scanned++;
1871                         lfsck_namespace_trace_update(env, com,
1872                                         lfsck_dto2fid(dir),
1873                                         LNTF_UNCERTAIN_LMV |
1874                                         LNTF_RECHECK_NAME_HASH, false);
1875                 }
1876
1877                 RETURN(0);
1878         }
1879
1880         /* Either the striped directory has been disabled or only part of
1881          * the striped directory have been scanned. The LFSCK cannot repair
1882          * something based on incompleted knowledge. So skip it. */
1883         if (llmv->ll_ignore || llmv->ll_exit_value <= 0)
1884                 RETURN(0);
1885
1886         /* There ever been some failure, as to the LFSCK cannot know whether
1887          * it has got the global knowledge about the LMV EA consistency or not,
1888          * so it cannot make reparation about the incompleted knowledge. */
1889         if (llmv->ll_failed) {
1890                 ns->ln_striped_dirs_scanned++;
1891                 ns->ln_striped_dirs_failed++;
1892
1893                 RETURN(0);
1894         }
1895
1896         if (lmv->lmv_stripe_count > LFSCK_LMV_MAX_STRIPES)
1897                 stripe_count = max(llmv->ll_max_filled_off + 1,
1898                                    llmv->ll_max_stripe_count);
1899         else
1900                 stripe_count = max(llmv->ll_max_filled_off + 1,
1901                                    lmv->lmv_stripe_count);
1902
1903         if (lmv->lmv_stripe_count != stripe_count) {
1904                 lmv->lmv_stripe_count = stripe_count;
1905                 llmv->ll_lmv_updated = 1;
1906         }
1907
1908         if (!lmv_is_known_hash_type(lmv->lmv_hash_type) &&
1909             !(lmv->lmv_hash_type & LMV_HASH_FLAG_BAD_TYPE) &&
1910             lmv_is_known_hash_type(llmv->ll_hash_type)) {
1911                 hash_type = llmv->ll_hash_type & LMV_HASH_TYPE_MASK;
1912                 lmv->lmv_hash_type = llmv->ll_hash_type;
1913                 llmv->ll_lmv_updated = 1;
1914         } else {
1915                 hash_type = lmv->lmv_hash_type & LMV_HASH_TYPE_MASK;
1916                 if (!lmv_is_known_hash_type(hash_type))
1917                         hash_type = LMV_HASH_TYPE_UNKNOWN;
1918         }
1919
1920         if (llmv->ll_lmv_updated) {
1921                 lmv->lmv_layout_version++;
1922                 rc = lfsck_namespace_update_lmv(env, com, dir, lmv, false);
1923                 if (rc != 0)
1924                         RETURN(rc);
1925
1926                 ns->ln_striped_dirs_scanned++;
1927                 ns->ln_striped_dirs_repaired++;
1928         }
1929
1930         fld_range_set_mdt(range);
1931         for (i = 0; i <= llmv->ll_max_filled_off; i++) {
1932                 struct dt_object *obj = NULL;
1933                 struct lfsck_slave_lmv_rec *lslr = llmv->ll_lslr + i;
1934                 const struct lu_fid *cfid = &lslr->lslr_fid;
1935                 const struct lu_name *cname;
1936                 struct linkea_data ldata = { 0 };
1937                 int len;
1938                 int rc1 = 0;
1939                 bool repair_linkea = false;
1940                 bool repair_lmvea = false;
1941                 bool rename = false;
1942                 bool create = false;
1943                 bool linkea_repaired = false;
1944                 bool lmvea_repaired = false;
1945                 bool rename_repaired = false;
1946                 bool create_repaired = false;
1947
1948                 /* LMV EA hole. */
1949                 if (fid_is_zero(cfid))
1950                         continue;
1951
1952                 len = snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
1953                                DFID":%u", PFID(cfid), i);
1954                 cname = lfsck_name_get_const(env, info->lti_tmpbuf, len);
1955                 memcpy(lnr->lnr_name, info->lti_tmpbuf, len);
1956
1957                 obj = lfsck_object_find_bottom(env, lfsck, cfid);
1958                 if (IS_ERR(obj)) {
1959                         rc1 = PTR_ERR(obj);
1960                         goto next;
1961                 }
1962
1963                 switch (lslr->lslr_flags) {
1964                 case LSLF_NONE:
1965                         if (llmv->ll_inline ||
1966                             lslr->lslr_stripe_count != stripe_count ||
1967                             (lslr->lslr_hash_type & LMV_HASH_TYPE_MASK) !=
1968                              hash_type)
1969                                 repair_lmvea = true;
1970                         break;
1971                 case LSLF_BAD_INDEX2:
1972                         /* The index in the slave LMV EA is right,
1973                          * the name entry should be updated. */
1974                         rename = true;
1975                         snprintf(info->lti_tmpbuf2, sizeof(info->lti_tmpbuf2),
1976                                  DFID":%u", PFID(cfid), lslr->lslr_index);
1977                         if (llmv->ll_inline ||
1978                             lslr->lslr_stripe_count != stripe_count ||
1979                             (lslr->lslr_hash_type & LMV_HASH_TYPE_MASK) !=
1980                              hash_type)
1981                                 repair_lmvea = true;
1982                         break;
1983                 case LSLF_BAD_INDEX1:
1984                         /* The index in the name entry is right,
1985                          * the slave LMV EA should be updated. */
1986                 case LSLF_NO_LMVEA:
1987                         repair_lmvea = true;
1988                         break;
1989                 case LSLF_DANGLING:
1990                         create = true;
1991                         goto repair;
1992                 default:
1993                         break;
1994                 }
1995
1996                 rc1 = lfsck_links_read(env, obj, &ldata);
1997                 if (rc1 == -ENOENT) {
1998                         create = true;
1999                         goto repair;
2000                 }
2001
2002                 if (rc1 == -EINVAL || rc1 == -ENODATA) {
2003                         repair_linkea = true;
2004                         goto repair;
2005                 }
2006
2007                 if (rc1 != 0)
2008                         goto next;
2009
2010                 if (ldata.ld_leh->leh_reccount != 1) {
2011                         repair_linkea = true;
2012                         goto repair;
2013                 }
2014
2015                 rc1 = linkea_links_find(&ldata, cname, pfid);
2016                 if (rc1 != 0)
2017                         repair_linkea = true;
2018
2019 repair:
2020                 if (create) {
2021                         rc1 = lfsck_namespace_repair_dangling(env, com,
2022                                                               obj, lnr);
2023                         if (rc1 >= 0) {
2024                                 create_repaired = true;
2025                                 if (rc == 0)
2026                                         ns->ln_dangling_repaired++;
2027                         }
2028                 }
2029
2030                 if (repair_lmvea) {
2031                         *lmv2 = *lmv;
2032                         lmv2->lmv_magic = LMV_MAGIC_STRIPE;
2033                         lmv2->lmv_stripe_count = stripe_count;
2034                         lmv2->lmv_master_mdt_index = i;
2035                         lmv2->lmv_hash_type = hash_type;
2036
2037                         rc1 = lfsck_namespace_update_lmv(env, com, obj,
2038                                                          lmv2, false);
2039                         if (rc1 < 0)
2040                                 goto next;
2041
2042                         if (dt_object_remote(obj)) {
2043                                 rc1 = fld_server_lookup(env, ss->ss_server_fld,
2044                                         fid_seq(lfsck_dto2fid(obj)), range);
2045                                 if (rc1 != 0)
2046                                         goto next;
2047
2048                                 rc1 = lfsck_namespace_notify_lmv_remote(env,
2049                                                 com, obj, LE_SET_LMV_SLAVE, 0,
2050                                                 range->lsr_index);
2051                         } else {
2052                                 ns->ln_striped_shards_repaired++;
2053                                 rc1 = lfsck_namespace_trace_update(env, com,
2054                                         cfid, LNTF_RECHECK_NAME_HASH, true);
2055                         }
2056
2057                         if (rc1 < 0)
2058                                 goto next;
2059
2060                         if (rc1 >= 0)
2061                                 lmvea_repaired = true;
2062                 } else if (llmv->ll_inline) {
2063                         if (dt_object_remote(obj)) {
2064                                 rc1 = fld_server_lookup(env, ss->ss_server_fld,
2065                                         fid_seq(lfsck_dto2fid(obj)), range);
2066                                 if (rc1 != 0)
2067                                         goto next;
2068
2069                                 /* The slave LMV EA on the remote shard is
2070                                  * correct, just notify the LFSCK instance
2071                                  * on such MDT to re-verify the name_hash. */
2072                                 rc1 = lfsck_namespace_notify_lmv_remote(env,
2073                                                 com, obj, LE_SET_LMV_SLAVE,
2074                                                 LEF_RECHECK_NAME_HASH,
2075                                                 range->lsr_index);
2076                         } else {
2077                                 rc1 = lfsck_namespace_trace_update(env, com,
2078                                         cfid, LNTF_RECHECK_NAME_HASH, true);
2079                         }
2080
2081                         if (rc1 < 0)
2082                                 goto next;
2083                 }
2084
2085                 if (rename) {
2086                         rc1 = lfsck_namespace_repair_dirent(env, com, dir, obj,
2087                                         info->lti_tmpbuf2, lnr->lnr_name,
2088                                         lnr->lnr_type, true, false);
2089                         if (rc1 >= 0) {
2090                                 rename_repaired = true;
2091                                 if (rc1 > 0) {
2092                                         ns->ln_dirent_repaired++;
2093                                         rc1 = lfsck_namespace_trace_update(env,
2094                                                 com, cfid,
2095                                                 LNTF_RECHECK_NAME_HASH, true);
2096                                 }
2097                         }
2098
2099                         if (rc1 < 0)
2100                                 goto next;
2101                 }
2102
2103                 if (repair_linkea) {
2104                         struct lustre_handle lh = { 0 };
2105
2106                         rc1 = linkea_data_new(&ldata, &info->lti_big_buf);
2107                         if (rc1 != 0)
2108                                 goto next;
2109
2110                         rc1 = linkea_add_buf(&ldata, cname, lfsck_dto2fid(dir));
2111                         if (rc1 != 0)
2112                                 goto next;
2113
2114                         rc1 = lfsck_ibits_lock(env, lfsck, obj, &lh,
2115                                                MDS_INODELOCK_UPDATE |
2116                                                MDS_INODELOCK_XATTR, LCK_EX);
2117                         lfsck_ibits_unlock(&lh, LCK_EX);
2118                         if (rc1 != 0)
2119                                 goto next;
2120
2121                         rc1 = lfsck_namespace_rebuild_linkea(env, com, obj,
2122                                                              &ldata);
2123                         if (rc1 >= 0) {
2124                                 linkea_repaired = true;
2125                                 if (rc1 > 0)
2126                                         ns->ln_linkea_repaired++;
2127                         }
2128                 }
2129
2130 next:
2131                 CDEBUG(D_LFSCK, "%s: namespace LFSCK repair the shard "
2132                       "%d "DFID" of the striped directory "DFID" with "
2133                       "dangling %s/%s, rename %s/%s, llinkea %s/%s, "
2134                       "repair_lmvea %s/%s: rc = %d\n", lfsck_lfsck2name(lfsck),
2135                       i, PFID(cfid), PFID(&lnr->lnr_fid),
2136                       create ? "yes" : "no", create_repaired ? "yes" : "no",
2137                       rename ? "yes" : "no", rename_repaired ? "yes" : "no",
2138                       repair_linkea ? "yes" : "no",
2139                       linkea_repaired ? "yes" : "no",
2140                       repair_lmvea ? "yes" : "no",
2141                       lmvea_repaired ? "yes" : "no", rc1);
2142
2143                 if (obj != NULL && !IS_ERR(obj))
2144                         lfsck_object_put(env, obj);
2145
2146                 if (rc1 < 0) {
2147                         rc = rc1;
2148                         ns->ln_striped_shards_failed++;
2149                 }
2150         }
2151
2152         RETURN(rc);
2153 }
2154
2155 /**
2156  * Verify the shard's name entry under the striped directory.
2157  *
2158  * Before all shards of the striped directory scanned, the LFSCK cannot
2159  * know whether the master LMV EA is valid or not, and also cannot know
2160  * how to repair an invalid shard exactly. For example, the stripe index
2161  * stored in the shard's name does not match the stripe index stored in
2162  * the slave LMV EA, then the LFSCK cannot know which one is correct.
2163  * If the LFSCK just assumed one is correct, and fixed the other, then
2164  * as the LFSCK processing, it may find that the former reparation is
2165  * wrong and have to roll back. Unfortunately, if some applications saw
2166  * the changes and made further modification based on such changes, then
2167  * the roll back is almost impossible.
2168  *
2169  * To avoid above trouble, the LFSCK will scan the master object of the
2170  * striped directory twice, that is NOT the same as normal two-stages
2171  * scanning, the double scanning the striped directory will happen both
2172  * during the first-stage scanning:
2173  *
2174  * 1) When the striped directory is opened for scanning, the LFSCK will
2175  *    iterate each shard in turn, and records its slave LMV EA in the
2176  *    lfsck_lmv::ll_lslr. In this step, if the 'shard' (may be fake
2177  *    shard) name does not match the shard naming rule, for example, it
2178  *    does not contains the shard's FID, or not contains index, then we
2179  *    can remove the bad name entry directly. But if the name is valid,
2180  *    but the shard has no slave LMV EA or the slave LMV EA does not
2181  *    match its name, then we just record related information in the
2182  *    lfsck_lmv::ll_lslr in RAM.
2183  *
2184  * 2) When all the known shards have been scanned, then the engine will
2185  *    generate a dummy request (via lfsck_namespace_close_dir) to tell
2186  *    the assistant thread that all the known shards have been scanned.
2187  *    Since the assistant has got the global knowledge about the index
2188  *    conflict, stripe count, hash type, and so on. Then the assistant
2189  *    thread will scan the lfsck_lmv::ll_lslr, and for every shard in
2190  *    the record, check and repair inconsistency.
2191  *
2192  * Generally, the stripe directory has only several shards, and there
2193  * will NOT be a lof of striped directory. So double scanning striped
2194  * directory will not much affect the LFSCK performance.
2195  *
2196  * \param[in] env       pointer to the thread context
2197  * \param[in] com       pointer to the lfsck component
2198  * \param[in] lnr       pointer to the namespace request that contains the
2199  *                      shard's name, parent object, parent's LMV, and ect.
2200  *
2201  * \retval              zero for succeed
2202  * \retval              negative error number on failure
2203  */
2204 int lfsck_namespace_handle_striped_master(const struct lu_env *env,
2205                                           struct lfsck_component *com,
2206                                           struct lfsck_namespace_req *lnr)
2207 {
2208         struct lfsck_thread_info   *info        = lfsck_env_info(env);
2209         struct lmv_mds_md_v1       *lmv         = &info->lti_lmv;
2210         struct lfsck_instance      *lfsck       = com->lc_lfsck;
2211         struct lfsck_namespace     *ns          = com->lc_file_ram;
2212         struct lfsck_lmv           *llmv        = lnr->lnr_lmv;
2213         struct dt_object           *dir         = lnr->lnr_obj;
2214         const struct lu_fid        *pfid        = lfsck_dto2fid(dir);
2215         struct dt_object           *obj         = NULL;
2216         struct dt_device           *dev         = NULL;
2217         int                         shard_idx   = 0;
2218         int                         stripe      = 0;
2219         int                         rc          = 0;
2220         int                         depth       = 0;
2221         bool                        repaired    = false;
2222         enum lfsck_namespace_inconsistency_type type = LNIT_NONE;
2223         ENTRY;
2224
2225         if (unlikely(llmv->ll_ignore))
2226                 RETURN(0);
2227
2228         shard_idx = lfsck_find_mdt_idx_by_fid(env, lfsck, &lnr->lnr_fid);
2229         if (shard_idx < 0)
2230                 GOTO(fail_lmv, rc = shard_idx);
2231
2232         if (shard_idx == lfsck_dev_idx(lfsck->li_bottom)) {
2233                 if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0))
2234                         GOTO(out, rc = 0);
2235
2236                 dev = lfsck->li_next;
2237         } else {
2238                 struct lfsck_tgt_desc *ltd;
2239
2240                 /* Usually, some local filesystem consistency verification
2241                  * tools can guarantee the local namespace tree consistenct.
2242                  * So the LFSCK will only verify the remote directory. */
2243                 if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0)) {
2244                         rc = lfsck_namespace_trace_update(env, com, pfid,
2245                                                 LNTF_CHECK_PARENT, true);
2246
2247                         GOTO(out, rc);
2248                 }
2249
2250                 ltd = LTD_TGT(&lfsck->li_mdt_descs, shard_idx);
2251                 if (unlikely(ltd == NULL)) {
2252                         CDEBUG(D_LFSCK, "%s: cannot talk with MDT %x which "
2253                                "did not join the namespace LFSCK\n",
2254                                lfsck_lfsck2name(lfsck), shard_idx);
2255                         lfsck_lad_set_bitmap(env, com, shard_idx);
2256
2257                         GOTO(fail_lmv, rc = -ENODEV);
2258                 }
2259
2260                 dev = ltd->ltd_tgt;
2261         }
2262
2263         obj = lfsck_object_find_by_dev(env, dev, &lnr->lnr_fid);
2264         if (IS_ERR(obj))
2265                 GOTO(fail_lmv, rc = PTR_ERR(obj));
2266
2267         if (!dt_object_exists(obj)) {
2268                 stripe = lfsck_shard_name_to_index(env, lnr->lnr_name,
2269                                 lnr->lnr_namelen, lnr->lnr_type, &lnr->lnr_fid);
2270                 if (stripe < 0) {
2271                         type = LNIT_BAD_DIRENT;
2272
2273                         GOTO(out, rc = 0);
2274                 }
2275
2276 dangling:
2277                 rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
2278                 if (rc == 0)
2279                         rc = lfsck_record_lmv(env, com, lnr, NULL, stripe,
2280                                               LSLF_DANGLING, LSLF_NONE, &depth);
2281
2282                 GOTO(out, rc);
2283         }
2284
2285         stripe = lfsck_shard_name_to_index(env, lnr->lnr_name, lnr->lnr_namelen,
2286                                            lfsck_object_type(obj),
2287                                            &lnr->lnr_fid);
2288         if (stripe < 0) {
2289                 type = LNIT_BAD_DIRENT;
2290
2291                 GOTO(out, rc = 0);
2292         }
2293
2294         rc = lfsck_read_stripe_lmv(env, obj, lmv);
2295         if (unlikely(rc == -ENOENT))
2296                 /* It may happen when the remote object has been removed,
2297                  * but the local MDT does not aware of that. */
2298                 goto dangling;
2299
2300         if (rc == -ENODATA)
2301                 rc = lfsck_record_lmv(env, com, lnr, lmv, stripe,
2302                                       LSLF_NO_LMVEA, LSLF_NONE, &depth);
2303         else if (rc == 0)
2304                 rc = lfsck_record_lmv(env, com, lnr, lmv, stripe,
2305                                       lmv->lmv_master_mdt_index != stripe ?
2306                                       LSLF_BAD_INDEX1 : LSLF_NONE, LSLF_NONE,
2307                                       &depth);
2308
2309         GOTO(out, rc);
2310
2311 fail_lmv:
2312         llmv->ll_failed = 1;
2313
2314 out:
2315         if (rc >= 0 && type == LNIT_NONE && !S_ISDIR(lnr->lnr_type))
2316                 type = LNIT_BAD_TYPE;
2317
2318         switch (type) {
2319         case LNIT_BAD_TYPE:
2320                 rc = lfsck_namespace_repair_dirent(env, com, dir, obj,
2321                                                    lnr->lnr_name, lnr->lnr_name,
2322                                                    lnr->lnr_type, true, false);
2323                 if (rc > 0)
2324                         repaired = true;
2325                 break;
2326         case LNIT_BAD_DIRENT:
2327                 rc = lfsck_namespace_repair_dirent(env, com, dir, obj,
2328                                                    lnr->lnr_name, lnr->lnr_name,
2329                                                    lnr->lnr_type, false, false);
2330                 if (rc > 0)
2331                         repaired = true;
2332                 break;
2333         default:
2334                 break;
2335         }
2336
2337         if (rc < 0) {
2338                 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle "
2339                        "the shard: "DFID", parent "DFID", name %.*s: rc = %d\n",
2340                        lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid),
2341                        PFID(lfsck_dto2fid(lnr->lnr_obj)),
2342                        lnr->lnr_namelen, lnr->lnr_name, rc);
2343
2344                 if ((rc == -ENOTCONN || rc == -ESHUTDOWN || rc == -EREMCHG ||
2345                      rc == -ETIMEDOUT || rc == -EHOSTDOWN ||
2346                      rc == -EHOSTUNREACH || rc == -EINPROGRESS) &&
2347                     dev != NULL && dev != lfsck->li_next)
2348                         lfsck_lad_set_bitmap(env, com, shard_idx);
2349
2350                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT))
2351                         rc = 0;
2352         } else {
2353                 if (repaired) {
2354                         ns->ln_items_repaired++;
2355
2356                         switch (type) {
2357                         case LNIT_BAD_TYPE:
2358                                 ns->ln_bad_type_repaired++;
2359                                 break;
2360                         case LNIT_BAD_DIRENT:
2361                                 ns->ln_dirent_repaired++;
2362                                 break;
2363                         default:
2364                                 break;
2365                         }
2366                 }
2367
2368                 rc = 0;
2369         }
2370
2371         if (obj != NULL && !IS_ERR(obj))
2372                 lfsck_object_put(env, obj);
2373
2374         return rc;
2375 }