Whamcloud - gitweb
709d4a211b0ce3119565a416fbf907e28d805e5f
[fs/lustre-release.git] / lustre / lfsck / lfsck_striped_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_striped_dir.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 /*
32  * About the verification for striped directory. Some rules and assumptions:
33  *
34  * 1) lmv_magic: The magic may be wrong. But it is almost impossible (1/2^32
35  *    probability) that a master LMV EA claims as a slave LMV EA by wrong,
36  *    so we can ignore such race case and the reverse case.
37  *
38  * 2) lmv_master_mdt_index: The master index can be self-verified by compared
39  *    with the MDT index directly. The slave stripe index can be verified by
40  *    compared with the file name. Although both the name entry and the LMV EA
41  *    can be wrong, it is almost impossible that they hit the same bad data
42  *    So if they match each other, then trust them. Similarly, for the shard,
43  *    it stores index in both slave LMV EA and in linkEA, if the two copies
44  *    match, then trust them.
45  *
46  * 3) lmv_hash_type: The valid hash type should be LMV_HASH_TYPE_ALL_CHARS or
47  *    LMV_HASH_TYPE_FNV_1A_64. If the LFSCK instance on some slave finds that
48  *    the name hash against the hash function does not match the MDT, then it
49  *    will change the master LMV EA hash type as LMV_HASH_TYPE_UNKNOWN. With
50  *    such hash type, the whole striped directory still can be accessed via
51  *    lookup/readdir, and also support unlink, but cannot add new name entry.
52  *
53  * 3.1) If the master hash type is one of the valid values, then trust the
54  *      master LMV EA. Because:
55  *
56  * 3.1.1) The master hash type is visible to the client and used by the client.
57  *
58  * 3.1.2) For a given name, different hash types may map the name entry to the
59  *        same MDT. So simply checking one name entry or some name entries may
60  *        cannot verify whether the hash type is correct or not.
61  *
62  * 3.1.3) Different shards can claim different hash types, it is not easy to
63  *        distinguish which ones are correct. Even though the master is wrong,
64  *        as the LFSCK processing, some LFSCK instance on other MDT may finds
65  *        unmatched name hash, then it will change the master hash type to
66  *        LMV_HASH_TYPE_UNKNOWN as described above. The worst case is euqal
67  *        to the case without the LFSCK.
68  *
69  * 3.2) If the master hash type is invalid, nor LMV_HASH_TYPE_UNKNOWN, then
70  *      trust the first shard with valid hash type (ALL_CHARS or FNV_1A_64).
71  *      If the shard is also worng, means there are double failures, then as
72  *      the LFSCK processing, other LFSCK instances on the other MDTs may
73  *      find unmatched name hash, and then, the master hash type will be
74  *      changed to LMV_HASH_TYPE_UNKNOWN as described in the 3).
75  *
76  * 3.3) If the master hash type is LMV_HASH_TYPE_UNKNOWN, then it is possible
77  *      that some other LFSCK instance on other MDT found bad name hash, then
78  *      changed the master hash type to LMV_HASH_TYPE_UNKNOWN as described in
79  *      the 3). But it also maybe because of data corruption in master LMV EA.
80  *      To make such two cases to be distinguishable, when the LFSCK changes
81  *      the master hash type to LMV_HASH_TYPE_UNKNOWN, it will mark in the
82  *      master LMV EA (new lmv flags LMV_HASH_FLAG_BAD_TYPE). Then subsequent
83  *      LFSCK checking can distinguish them: for former case, turst the master
84  *      LMV EA with nothing to be done; otherwise, trust the first shard with
85  *      valid hash type (ALL_CHARS or FNV_1A_64) as the 3.2) does.
86  *
87  * 4) lmv_stripe_count: For a shard of a striped directory, if its index has
88  *    been verified as the 2), then the stripe count must be larger than its
89  *    index. For the master object, by scanning each shard's index, the LFSCK
90  *    can know the highest index, and the stripe count must be larger than the
91  *    known highest index. If the stipe count in the LMV EA matches above two
92  *    rules, then it is may be trustable. If both the master claimed stripe
93  *    count and the slave claimed stripe count match each own rule, but they
94  *    are not the same, then trust the master. Because the stripe count in
95  *    the master LMV EA is visible to client and used to distribute the name
96  *    entry to some shard, but the slave LMV EA is only used for verification
97  *    and invisible to client.
98  *
99  * 5) If the master LMV EA is lost, then there are two possible cases:
100  *
101  * 5.1) The slave claims slave LMV EA by wrong, means that the parent was not
102  *      a striped directory, but its sub-directory has a wrong slave LMV EA.
103  *      It is very very race case, similar as the 1), can be ignored.
104  *
105  * 5.2) The parent directory is a striped directory, but the master LMV EA
106  *      is lost or crashed. Then the LFSCK needs to re-generate the master
107  *      LMV EA: the lmv_master_mdt_index is from the MDT device index; the
108  *      lmv_hash_type is from the first valid shard; the lmv_stripe_count
109  *      will be calculated via scanning all the shards.
110  *
111  * 5.2.1) Before re-generating the master LMV EA, the LFSCK needs to check
112  *        whether someone has created some file(s) under the master object
113  *        after the master LMV EA disappear. If yes, the LFSCK will cannot
114  *        re-generate the master LMV EA, otherwise, such new created files
115  *        will be invisible to client. Under such case, the LFSCK will mark
116  *        the master object as read only (without master LMV EA). Then all
117  *        things under the master MDT-object, including those new created
118  *        files and the shards themselves, will be visibile to client. And
119  *        then the administrator can handle the bad striped directory with
120  *        more human knowledge.
121  *
122  * 5.2.2) If someone created some special sub-directory under the master
123  *        MDT-object with the same naming rule as shard name $FID:$index,
124  *        as to the LFSCK cannot detect it before re-generating the master
125  *        LMV EA, then such sub-directory itself will be invisible after
126  *        the LFSCK re-generating the master LMV EA. The sub-items under
127  *        such sub-directory are still visible to client. As the LFSCK
128  *        processing, if such sub-directory cause some conflict with other
129  *        normal shard, such as the index conflict, then the LFSCK will
130  *        remove the master LMV EA and change the master MDT-object to
131  *        read-only mode as the 5.2.1). But if there is no conflict, the
132  *        LFSCK will regard such sub-directory as a striped shard that
133  *        lost its slave LMV EA, and will re-generate slave LMV EA for it.
134  *
135  * 5.2.3) Anytime, if the LFSCK found some shards name/index conflict,
136  *        and cannot make the distinguish which one is right, then it
137  *        will remove the master LMV EA and change the MDT-object to
138  *        read-only mode as the 5.2.2).
139  */
140
141 #define DEBUG_SUBSYSTEM S_LFSCK
142
143 #include <lustre/lustre_idl.h>
144 #include <lu_object.h>
145 #include <dt_object.h>
146 #include <md_object.h>
147 #include <lustre_fid.h>
148 #include <lustre_lib.h>
149 #include <lustre_net.h>
150 #include <lustre_lmv.h>
151 #include <lustre/lustre_user.h>
152
153 #include "lfsck_internal.h"
154
155 void lfsck_lmv_put(const struct lu_env *env, struct lfsck_lmv *llmv)
156 {
157         if (llmv != NULL && atomic_dec_and_test(&llmv->ll_ref)) {
158                 if (llmv->ll_inline) {
159                         struct lfsck_lmv_unit   *llu;
160                         struct lfsck_instance   *lfsck;
161
162                         llu = list_entry(llmv, struct lfsck_lmv_unit, llu_lmv);
163                         lfsck = llu->llu_lfsck;
164
165                         spin_lock(&lfsck->li_lock);
166                         list_del(&llu->llu_link);
167                         spin_unlock(&lfsck->li_lock);
168
169                         lfsck_object_put(env, llu->llu_obj);
170
171                         LASSERT(llmv->ll_lslr != NULL);
172
173                         OBD_FREE_LARGE(llmv->ll_lslr,
174                                        sizeof(*llmv->ll_lslr) *
175                                        llmv->ll_stripes_allocated);
176                         OBD_FREE_PTR(llu);
177                 } else {
178                         if (llmv->ll_lslr != NULL)
179                                 OBD_FREE_LARGE(llmv->ll_lslr,
180                                         sizeof(*llmv->ll_lslr) *
181                                         llmv->ll_stripes_allocated);
182
183                         OBD_FREE_PTR(llmv);
184                 }
185         }
186 }
187
188 /**
189  * Mark the specified directory as read-only by set LUSTRE_IMMUTABLE_FL.
190  *
191  * The caller has taken the ldlm lock on the @obj already.
192  *
193  * \param[in] env       pointer to the thread context
194  * \param[in] com       pointer to the lfsck component
195  * \param[in] obj       pointer to the object to be handled
196  * \param[in] del_lmv   true if need to drop the LMV EA
197  *
198  * \retval              positive number if nothing to be done
199  * \retval              zero for succeed
200  * \retval              negative error number on failure
201  */
202 static int lfsck_disable_master_lmv(const struct lu_env *env,
203                                     struct lfsck_component *com,
204                                     struct dt_object *obj, bool del_lmv)
205 {
206         struct lfsck_thread_info        *info   = lfsck_env_info(env);
207         struct lu_attr                  *la     = &info->lti_la;
208         struct lfsck_instance           *lfsck  = com->lc_lfsck;
209         struct dt_device                *dev    = lfsck_obj2dt_dev(obj);
210         struct thandle                  *th     = NULL;
211         int                              rc     = 0;
212         ENTRY;
213
214         th = dt_trans_create(env, dev);
215         if (IS_ERR(th))
216                 GOTO(log, rc = PTR_ERR(th));
217
218         if (del_lmv) {
219                 rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LMV, th);
220                 if (rc != 0)
221                         GOTO(stop, rc);
222         }
223
224         la->la_valid = LA_FLAGS;
225         rc = dt_declare_attr_set(env, obj, la, th);
226         if (rc != 0)
227                 GOTO(stop, rc);
228
229         rc = dt_trans_start_local(env, dev, th);
230         if (rc != 0)
231                 GOTO(stop, rc);
232
233         dt_write_lock(env, obj, 0);
234         if (unlikely(lfsck_is_dead_obj(obj)))
235                 GOTO(unlock, rc = 1);
236
237         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
238                 GOTO(unlock, rc = 0);
239
240         if (del_lmv) {
241                 rc = dt_xattr_del(env, obj, XATTR_NAME_LMV, th, BYPASS_CAPA);
242                 if (rc != 0)
243                         GOTO(unlock, rc);
244         }
245
246         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
247         if (rc == 0 && !(la->la_flags & LUSTRE_IMMUTABLE_FL)) {
248                 la->la_valid = LA_FLAGS;
249                 la->la_flags |= LUSTRE_IMMUTABLE_FL;
250                 rc = dt_attr_set(env, obj, la, th, BYPASS_CAPA);
251         }
252
253         GOTO(unlock, rc);
254
255 unlock:
256         dt_write_unlock(env, obj);
257
258 stop:
259         dt_trans_stop(env, dev, th);
260
261 log:
262         CDEBUG(D_LFSCK, "%s: namespace LFSCK set the master MDT-object of "
263                "the striped directory "DFID" as read-only: rc = %d\n",
264                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
265
266         if (rc <= 0) {
267                 struct lfsck_namespace *ns = com->lc_file_ram;
268
269                 ns->ln_flags |= LF_INCONSISTENT;
270                 if (rc == 0)
271                         ns->ln_striped_dirs_disabled++;
272         }
273
274         return rc;
275 }
276
277 static inline bool lfsck_is_valid_slave_lmv(struct lmv_mds_md_v1 *lmv)
278 {
279         return lmv->lmv_stripe_count >= 1 &&
280                lmv->lmv_stripe_count <= LFSCK_LMV_MAX_STRIPES &&
281                lmv->lmv_stripe_count > lmv->lmv_master_mdt_index &&
282                lmv_is_known_hash_type(lmv->lmv_hash_type);
283 }
284
285 /**
286  * Remove the striped directory's master LMV EA and mark it as read-only.
287  *
288  * Take ldlm lock on the striped directory before calling the
289  * lfsck_disable_master_lmv().
290  *
291  * \param[in] env       pointer to the thread context
292  * \param[in] com       pointer to the lfsck component
293  * \param[in] lnr       pointer to the namespace request that contains the
294  *                      striped directory to be handled and other information
295  *
296  * \retval              positive number if nothing to be done
297  * \retval              zero for succeed
298  * \retval              negative error number on failure
299  */
300 static int lfsck_remove_lmv(const struct lu_env *env,
301                             struct lfsck_component *com,
302                             struct lfsck_namespace_req *lnr)
303 {
304         struct dt_object        *obj    = lnr->lnr_obj;
305         struct lustre_handle     lh     = { 0 };
306         int                      rc;
307
308         lnr->lnr_lmv->ll_ignore = 1;
309         rc = lfsck_ibits_lock(env, com->lc_lfsck, obj, &lh,
310                               MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
311                               LCK_EX);
312         if (rc == 0) {
313                 rc = lfsck_disable_master_lmv(env, com, obj, true);
314                 lfsck_ibits_unlock(&lh, LCK_EX);
315         }
316
317         return rc;
318 }
319
320 /**
321  * Remove the name entry from the striped directory's master MDT-object.
322  *
323  * \param[in] env       pointer to the thread context
324  * \param[in] com       pointer to the lfsck component
325  * \param[in] dir       pointer to the striped directory
326  * \param[in] fid       the shard's FID which name entry will be removed
327  * \param[in] index     the shard's index which name entry will be removed
328  *
329  * \retval              positive number for repaired successfully
330  * \retval              0 if nothing to be repaired
331  * \retval              negative error number on failure
332  */
333 static int lfsck_remove_dirent(const struct lu_env *env,
334                                struct lfsck_component *com,
335                                struct dt_object *dir,
336                                const struct lu_fid *fid, __u32 index)
337 {
338         struct lfsck_thread_info        *info = lfsck_env_info(env);
339         struct dt_object                *obj;
340         int                              rc;
341
342         snprintf(info->lti_tmpbuf2, sizeof(info->lti_tmpbuf2), DFID":%u",
343                  PFID(fid), index);
344         obj = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, fid);
345         if (IS_ERR(obj))
346                 return PTR_ERR(obj);
347
348         rc = lfsck_namespace_repair_dirent(env, com, dir, obj,
349                                         info->lti_tmpbuf2, info->lti_tmpbuf2,
350                                         S_IFDIR, false, false);
351         lfsck_object_put(env, obj);
352         if (rc > 0) {
353                 struct lfsck_namespace *ns = com->lc_file_ram;
354
355                 ns->ln_dirent_repaired++;
356         }
357
358         return rc;
359 }
360
361 /**
362  * Remove old shard's name entry and refill the @lslr slot with new shard.
363  *
364  * Some old shard held the specified @lslr slot, but it is an invalid shard.
365  * This function will remove the bad shard's name entry, and refill the @lslr
366  * slot with the new shard.
367  *
368  * \param[in] env       pointer to the thread context
369  * \param[in] com       pointer to the lfsck component
370  * \param[in] lslr      pointer to lfsck_disable_master_lmv slot which content
371  *                      will be replaced by the given information
372  * \param[in] lnr       contain the shard's FID to be used to fill the
373  *                      @lslr slot, it also records the known max filled index
374  *                      and the known max stripe count
375  * \param[in] lmv       contain the slave LMV EA to be used to fill the
376  *                      @lslr slot
377  * \param[in] index     the old shard's index in the striped directory
378  * \param[in] flags     the new shard's flags in the @lslr slot
379  *
380  * \retval              zero for succeed
381  * \retval              negative error number on failure
382  */
383 static int lfsck_replace_lmv(const struct lu_env *env,
384                              struct lfsck_component *com,
385                              struct lfsck_slave_lmv_rec *lslr,
386                              struct lfsck_namespace_req *lnr,
387                              struct lmv_mds_md_v1 *lmv,
388                              __u32 index, __u32 flags)
389 {
390         struct lfsck_lmv *llmv = lnr->lnr_lmv;
391         int               rc;
392
393         rc = lfsck_remove_dirent(env, com, lnr->lnr_obj,
394                                  &lslr->lslr_fid, index);
395         if (rc < 0)
396                 return rc;
397
398         lslr->lslr_fid = lnr->lnr_fid;
399         lslr->lslr_flags = flags;
400         lslr->lslr_stripe_count = lmv->lmv_stripe_count;
401         lslr->lslr_index = lmv->lmv_master_mdt_index;
402         lslr->lslr_hash_type = lmv->lmv_hash_type;
403         if (flags == LSLF_NONE) {
404                 if (llmv->ll_hash_type == LMV_HASH_TYPE_UNKNOWN &&
405                     lmv_is_known_hash_type(lmv->lmv_hash_type))
406                         llmv->ll_hash_type = lmv->lmv_hash_type;
407
408                 if (lslr->lslr_stripe_count <= LFSCK_LMV_MAX_STRIPES &&
409                     llmv->ll_max_stripe_count < lslr->lslr_stripe_count)
410                         llmv->ll_max_stripe_count = lslr->lslr_stripe_count;
411         }
412
413         return 0;
414 }
415
416 /**
417  * Record the slave LMV EA in the lfsck_lmv::ll_lslr.
418  *
419  * If the lfsck_lmv::ll_lslr slot corresponding to the given @shard_idx is free,
420  * then fill the slot with the given @lnr/@lmv/@flags directly (maybe need to
421  * extend the lfsck_lmv::ll_lslr buffer).
422  *
423  * If the lfsck_lmv::ll_lslr slot corresponding to the given @shard_idx is taken
424  * by other shard, then the LFSCK will try to resolve the conflict by checking
425  * the two conflict shards' flags, and try other possible slot (if one of them
426  * claims another possible @shard_idx).
427  *
428  * 1) If one of the two conflict shards can be recorded in another slot, then
429  *    it is OK, go ahead. Otherwise,
430  *
431  * 2) If one of them is dangling name entry, then remove (one of) the dangling
432  *    name entry (and replace related @lslr slot if needed). Otherwise,
433  *
434  * 3) If one of them has no slave LMV EA, then check whether the master LMV
435  *    EA has ever been lost and re-generated (LMV_HASH_FLAG_LOST_LMV in the
436  *    master LMV EA).
437  *
438  * 3.1) If yes, then it is possible that such object is not a real shard of
439  *      the striped directory, instead, it was created by someone after the
440  *      master LMV EA lost with the name that matches the shard naming rule.
441  *      Then the LFSCK will remove the master LMV EA and mark the striped
442  *      directory as read-only to allow those non-shard files to be visible
443  *      to client.
444  *
445  * 3.2) If no, then remove (one of) the object what has no slave LMV EA.
446  *
447  * 4) If all above efforts cannot work, then the LFSCK cannot know how to
448  *    recover the striped directory. To make the administrator can see the
449  *    conflicts, the LFSCK will remove the master LMV EA and mark the striped
450  *    directory as read-only.
451  *
452  * This function may be called recursively, to prevent overflow, we define
453  * LFSCK_REC_LMV_MAX_DEPTH to restrict the recursive call depth.
454  *
455  * \param[in] env       pointer to the thread context
456  * \param[in] com       pointer to the lfsck component
457  * \param[in] lnr       contain the shard's FID to fill the @lslr slot,
458  *                      it also records the known max filled index and
459  *                      the known max stripe count
460  * \param[in] lmv       pointer to the slave LMV EA to be recorded
461  * \param[in] shard_idx the shard's index used for locating the @lslr slot,
462  *                      it can be the index stored in the shard's name,
463  *                      it also can be the index stored in the slave LMV EA
464  *                      (for recursive case)
465  * \param[in] flags     the shard's flags to be recorded in the @lslr slot
466  *                      to indicate the shard status, such as whether has
467  *                      slave LMV EA, whether dangling name entry, whether
468  *                      the name entry and slave LMV EA unmatched, and ect
469  * \param[in] flags2    when be called recursively, the @flags2 tells the
470  *                      former conflict shard's flags in the @lslr slot.
471  * \param[in,out] depth To prevent to be called recurisively too deep,
472  *                      we define the max depth can be called recursively
473  *                      (LFSCK_REC_LMV_MAX_DEPTH)
474  *
475  * \retval              zero for succeed
476  * \retval              "-ERANGE" for invalid @shard_idx
477  * \retval              "-EEXIST" for the required lslr slot has been
478  *                      occupied by other shard
479  * \retval              other negative error number on failure
480  */
481 static int lfsck_record_lmv(const struct lu_env *env,
482                             struct lfsck_component *com,
483                             struct lfsck_namespace_req *lnr,
484                             struct lmv_mds_md_v1 *lmv, __u32 shard_idx,
485                             __u32 flags, __u32 flags2, __u32 *depth)
486 {
487         struct lfsck_instance      *lfsck = com->lc_lfsck;
488         struct lfsck_lmv           *llmv  = lnr->lnr_lmv;
489         struct dt_object           *dir   = lnr->lnr_obj;
490         const struct lu_fid        *fid   = &lnr->lnr_fid;
491         struct lfsck_slave_lmv_rec *lslr;
492         struct lfsck_rec_lmv_save  *lrls;
493         int                         index = shard_idx;
494         int                         rc    = 0;
495         ENTRY;
496
497         CDEBUG(D_LFSCK, "%s: record slave LMV EA for the striped directory "
498                DFID": shard = "DFID", index = %u, flags = %u, flags2 = %u, "
499                "depth = %d\n", lfsck_lfsck2name(lfsck),
500                PFID(lfsck_dto2fid(dir)), PFID(fid),
501                index, flags, flags2, *depth);
502
503         if (index < 0 || index >= LFSCK_LMV_MAX_STRIPES)
504                 RETURN(-ERANGE);
505
506         if (index >= llmv->ll_stripes_allocated) {
507                 struct lfsck_slave_lmv_rec *new_lslr;
508                 int new_stripes = index + 1;
509                 size_t old_size = sizeof(*lslr) * llmv->ll_stripes_allocated;
510
511                 OBD_ALLOC_LARGE(new_lslr, sizeof(*new_lslr) * new_stripes);
512                 if (new_lslr == NULL) {
513                         llmv->ll_failed = 1;
514
515                         RETURN(-ENOMEM);
516                 }
517
518                 memcpy(new_lslr, llmv->ll_lslr, old_size);
519                 OBD_FREE_LARGE(llmv->ll_lslr, old_size);
520                 llmv->ll_stripes_allocated = new_stripes;
521                 llmv->ll_lslr = new_lslr;
522         }
523
524         lslr = llmv->ll_lslr + index;
525         if (unlikely(lu_fid_eq(&lslr->lslr_fid, fid)))
526                 RETURN(0);
527
528         if (fid_is_zero(&lslr->lslr_fid)) {
529                 lslr->lslr_fid = *fid;
530                 lslr->lslr_stripe_count = lmv->lmv_stripe_count;
531                 lslr->lslr_index = lmv->lmv_master_mdt_index;
532                 lslr->lslr_hash_type = lmv->lmv_hash_type;
533                 lslr->lslr_flags = flags;
534                 llmv->ll_stripes_filled++;
535                 if (flags == LSLF_NONE) {
536                         if (llmv->ll_hash_type == LMV_HASH_TYPE_UNKNOWN &&
537                             lmv_is_known_hash_type(lmv->lmv_hash_type))
538                                 llmv->ll_hash_type = lmv->lmv_hash_type;
539
540                         if (lslr->lslr_stripe_count <= LFSCK_LMV_MAX_STRIPES &&
541                             llmv->ll_max_stripe_count < lslr->lslr_stripe_count)
542                                 llmv->ll_max_stripe_count =
543                                                         lslr->lslr_stripe_count;
544                 }
545
546                 if (llmv->ll_max_filled_off < index)
547                         llmv->ll_max_filled_off = index;
548
549                 RETURN(0);
550         }
551
552         (*depth)++;
553         if (flags != LSLF_BAD_INDEX2)
554                 LASSERTF(*depth == 1, "depth = %d\n", *depth);
555
556         /* Handle conflict cases. */
557         switch (lslr->lslr_flags) {
558         case LSLF_NONE:
559         case LSLF_BAD_INDEX2:
560                 /* The existing one is a normal valid object. */
561                 switch (flags) {
562                 case LSLF_NONE:
563                         /* The two 'valid' name entries claims the same
564                          * index, the LFSCK cannot distinguish which one
565                          * is correct. Then remove the master LMV EA to
566                          * make all shards to be visible to client, and
567                          * mark the master MDT-object as read-only. The
568                          * administrator can handle the conflict with
569                          * more human knowledge. */
570                         rc = lfsck_remove_lmv(env, com, lnr);
571                         break;
572                 case LSLF_BAD_INDEX2:
573                         GOTO(out, rc = -EEXIST);
574                 case LSLF_NO_LMVEA:
575
576 no_lmvea:
577                         if (llmv->ll_lmv.lmv_hash_type &
578                             LMV_HASH_FLAG_LOST_LMV) {
579                                 /* If the master LMV EA was re-generated
580                                  * by the former LFSCK reparation, and
581                                  * before such reparation, someone has
582                                  * created the conflict object, but the
583                                  * LFSCK did not detect such conflict,
584                                  * then we have to remove the master
585                                  * LMV EA and mark the master MDT-object
586                                  * as read-only. The administrator can
587                                  * handle the conflict with more human
588                                  * knowledge. */
589                                 rc = lfsck_remove_lmv(env, com, lnr);
590                         } else {
591                                 /* Otherwise, remove the current name entry,
592                                  * and add its FID in the LFSCK tracing file
593                                  * for further processing. */
594                                 rc = lfsck_namespace_trace_update(env, com, fid,
595                                                 LNTF_CHECK_PARENT, true);
596                                 if (rc == 0)
597                                         rc = lfsck_remove_dirent(env, com, dir,
598                                                                  fid, index);
599                         }
600
601                         break;
602                 case LSLF_DANGLING:
603                         /* Remove the current dangling name entry. */
604                         rc = lfsck_remove_dirent(env, com, dir, fid, index);
605                         break;
606                 case LSLF_BAD_INDEX1:
607                         index = lmv->lmv_master_mdt_index;
608                         lmv->lmv_master_mdt_index = shard_idx;
609                         /* The name entry claims an index that is conflict
610                          * with a valid existing name entry, then try the
611                          * index in the lmv recursively. */
612                         rc = lfsck_record_lmv(env, com, lnr, lmv, index,
613                                 LSLF_BAD_INDEX2, lslr->lslr_flags, depth);
614                         lmv->lmv_master_mdt_index = index;
615                         if (rc == -ERANGE || rc == -EEXIST)
616                                 /* The index in the lmv is invalid or
617                                  * also conflict with other. Then we do
618                                  * not know how to resolve the conflict.
619                                  * We will handle it as handle the case
620                                  * of 'LSLF_NONE' vs 'LSLF_NONE'. */
621                                 rc = lfsck_remove_lmv(env, com, lnr);
622
623                         break;
624                 default:
625                         break;
626                 }
627
628                 break;
629         case LSLF_NO_LMVEA:
630                 /* The existing one has no slave LMV EA. */
631                 switch (flags) {
632                 case LSLF_NONE:
633
634 none:
635                         if (llmv->ll_lmv.lmv_hash_type &
636                             LMV_HASH_FLAG_LOST_LMV) {
637                                 /* If the master LMV EA was re-generated
638                                  * by the former LFSCK reparation, and
639                                  * before such reparation, someone has
640                                  * created the conflict object, but the
641                                  * LFSCK did not detect such conflict,
642                                  * then we have to remove the master
643                                  * LMV EA and mark the master MDT-object
644                                  * as read-only. The administrator can
645                                  * handle the conflict with more human
646                                  * knowledge. */
647                                 rc = lfsck_remove_lmv(env, com, lnr);
648                         } else {
649                                 lrls = &lfsck->li_rec_lmv_save[*depth - 1];
650                                 lrls->lrls_fid = lslr->lslr_fid;
651                                 /* Otherwise, remove the existing name entry,
652                                  * and add its FID in the LFSCK tracing file
653                                  * for further processing. Refill the slot
654                                  * with current slave LMV EA. */
655                                 rc = lfsck_namespace_trace_update(env,
656                                                 com, &lrls->lrls_fid,
657                                                 LNTF_CHECK_PARENT, true);
658                                 if (rc == 0)
659                                         rc = lfsck_replace_lmv(env, com, lslr,
660                                                         lnr, lmv, index, flags);
661                         }
662
663                         break;
664                 case LSLF_BAD_INDEX2:
665                         if (flags2 >= lslr->lslr_flags)
666                                 GOTO(out, rc = -EEXIST);
667
668                         goto none;
669                 case LSLF_NO_LMVEA:
670                         goto no_lmvea;
671                 case LSLF_DANGLING:
672                         /* Remove the current dangling name entry. */
673                         rc = lfsck_remove_dirent(env, com, dir, fid, index);
674                         break;
675                 case LSLF_BAD_INDEX1:
676                         index = lmv->lmv_master_mdt_index;
677                         lmv->lmv_master_mdt_index = shard_idx;
678                         /* The name entry claims an index that is conflict
679                          * with a valid existing name entry, then try the
680                          * index in the lmv recursively. */
681                         rc = lfsck_record_lmv(env, com, lnr, lmv, index,
682                                 LSLF_BAD_INDEX2, lslr->lslr_flags, depth);
683                         lmv->lmv_master_mdt_index = index;
684                         if (rc == -ERANGE || rc == -EEXIST) {
685                                 index = shard_idx;
686                                 goto no_lmvea;
687                         }
688
689                         break;
690                 default:
691                         break;
692                 }
693
694                 break;
695         case LSLF_DANGLING:
696                 /* The existing one is a dangling name entry. */
697                 switch (flags) {
698                 case LSLF_NONE:
699                 case LSLF_BAD_INDEX2:
700                 case LSLF_NO_LMVEA:
701                         /* Remove the existing dangling name entry.
702                          * Refill the lslr slot with the given LMV. */
703                         rc = lfsck_replace_lmv(env, com, lslr, lnr,
704                                                lmv, index, flags);
705                         break;
706                 case LSLF_DANGLING:
707                         /* Two dangling name entries conflict,
708                          * remove the current one. */
709                         rc = lfsck_remove_dirent(env, com, dir, fid, index);
710                         break;
711                 case LSLF_BAD_INDEX1:
712                         index = lmv->lmv_master_mdt_index;
713                         lmv->lmv_master_mdt_index = shard_idx;
714                         /* The name entry claims an index that is conflict
715                          * with a valid existing name entry, then try the
716                          * index in the lmv recursively. */
717                         rc = lfsck_record_lmv(env, com, lnr, lmv, index,
718                                 LSLF_BAD_INDEX2, lslr->lslr_flags, depth);
719                         lmv->lmv_master_mdt_index = index;
720                         if (rc == -ERANGE || rc == -EEXIST)
721                                 /* If the index in the lmv is invalid or
722                                  * also conflict with other, then remove
723                                  * the existing dangling name entry.
724                                  * Refill the lslr slot with the given LMV. */
725                                 rc = lfsck_replace_lmv(env, com, lslr, lnr,
726                                                        lmv, shard_idx, flags);
727
728                         break;
729                 default:
730                         break;
731                 }
732
733                 break;
734         case LSLF_BAD_INDEX1: {
735                 if (*depth >= LFSCK_REC_LMV_MAX_DEPTH)
736                         goto conflict;
737
738                 lrls = &lfsck->li_rec_lmv_save[*depth - 1];
739                 lrls->lrls_fid = lnr->lnr_fid;
740                 lrls->lrls_lmv = *lmv;
741
742                 lnr->lnr_fid = lslr->lslr_fid;
743                 lmv->lmv_master_mdt_index = index;
744                 lmv->lmv_stripe_count = lslr->lslr_stripe_count;
745                 lmv->lmv_hash_type = lslr->lslr_hash_type;
746                 index = lslr->lslr_index;
747
748                 /* The existing one has another possible slot,
749                  * try it recursively. */
750                 rc = lfsck_record_lmv(env, com, lnr, lmv, index,
751                                       LSLF_BAD_INDEX2, flags, depth);
752                 *lmv = lrls->lrls_lmv;
753                 lnr->lnr_fid = lrls->lrls_fid;
754                 index = shard_idx;
755                 if (rc != 0) {
756                         if (rc == -ERANGE || rc == -EEXIST)
757                                 goto conflict;
758
759                         break;
760                 }
761
762                 lslr->lslr_fid = *fid;
763                 lslr->lslr_flags = flags;
764                 lslr->lslr_stripe_count = lmv->lmv_stripe_count;
765                 lslr->lslr_index = lmv->lmv_master_mdt_index;
766                 lslr->lslr_hash_type = lmv->lmv_hash_type;
767                 if (flags == LSLF_NONE) {
768                         if (llmv->ll_hash_type == LMV_HASH_TYPE_UNKNOWN &&
769                             lmv_is_known_hash_type(lmv->lmv_hash_type))
770                                 llmv->ll_hash_type = lmv->lmv_hash_type;
771
772                         if (lslr->lslr_stripe_count <= LFSCK_LMV_MAX_STRIPES &&
773                             llmv->ll_max_stripe_count < lslr->lslr_stripe_count)
774                                 llmv->ll_max_stripe_count =
775                                                         lslr->lslr_stripe_count;
776                 }
777
778                 break;
779
780 conflict:
781                 switch (flags) {
782                 case LSLF_NONE:
783                         /* The two 'valid' name entries claims the same
784                          * index, the LFSCK cannot distinguish which one
785                          * is correct. Then remove the master LMV EA to
786                          * make all shards to be visible to client, and
787                          * mark the master MDT-object as read-only. The
788                          * administrator can handle the conflict with
789                          * more human knowledge. */
790                         rc = lfsck_remove_lmv(env, com, lnr);
791                         break;
792                 case LSLF_BAD_INDEX2:
793                         GOTO(out, rc = -EEXIST);
794                 case LSLF_NO_LMVEA:
795                         goto no_lmvea;
796                 case LSLF_DANGLING:
797                         /* Remove the current dangling name entry. */
798                         rc = lfsck_remove_dirent(env, com, dir, fid, index);
799                         break;
800                 case LSLF_BAD_INDEX1:
801                         index = lmv->lmv_master_mdt_index;
802                         lmv->lmv_master_mdt_index = shard_idx;
803                         /* The name entry claims an index that is conflict
804                          * with a valid existing name entry, then try the
805                          * index in the lmv recursively. */
806                         rc = lfsck_record_lmv(env, com, lnr, lmv, index,
807                                 LSLF_BAD_INDEX2, lslr->lslr_flags, depth);
808                         lmv->lmv_master_mdt_index = index;
809                         if (rc == -ERANGE || rc == -EEXIST)
810                                 /* The index in the lmv is invalid or
811                                  * also conflict with other. Then we do
812                                  * not know how to resolve the conflict.
813                                  * We will handle it as handle the case
814                                  * of 'LSLF_NONE' vs 'LSLF_NONE'. */
815                                 rc = lfsck_remove_lmv(env, com, lnr);
816
817                         break;
818                 }
819
820                 break;
821         }
822         default:
823                 break;
824         }
825
826         if (rc < 0)
827                 llmv->ll_failed = 1;
828
829         GOTO(out, rc);
830
831 out:
832         (*depth)--;
833
834         return rc > 0 ? 0 : rc;
835 }
836
837 int lfsck_read_stripe_lmv(const struct lu_env *env, struct dt_object *obj,
838                           struct lmv_mds_md_v1 *lmv)
839 {
840         struct dt_object *bottom;
841         int               rc;
842
843         /* Currently, we only store the LMV header on disk. It is the LOD's
844          * duty to iterate the master MDT-object's directory to compose the
845          * integrated LMV EA. But here, we only want to load the LMV header,
846          * so we need to bypass LOD to avoid unnecessary iteration in LOD. */
847         bottom = lu2dt(container_of0(obj->do_lu.lo_header->loh_layers.prev,
848                                      struct lu_object, lo_linkage));
849         if (unlikely(bottom == NULL))
850                 return -ENOENT;
851
852         dt_read_lock(env, bottom, 0);
853         rc = dt_xattr_get(env, bottom, lfsck_buf_get(env, lmv, sizeof(*lmv)),
854                           XATTR_NAME_LMV, BYPASS_CAPA);
855         dt_read_unlock(env, bottom);
856         if (rc != sizeof(*lmv))
857                 return rc > 0 ? -EINVAL : rc;
858
859         lfsck_lmv_header_le_to_cpu(lmv, lmv);
860         if ((lmv->lmv_magic == LMV_MAGIC &&
861              !(lmv->lmv_hash_type & LMV_HASH_FLAG_MIGRATION)) ||
862             (lmv->lmv_magic == LMV_MAGIC_STRIPE &&
863              !(lmv->lmv_hash_type & LMV_HASH_FLAG_DEAD)))
864                 return 0;
865
866         return -ENODATA;
867 }
868
869 /**
870  * Parse the shard's index from the given shard name.
871  *
872  * The valid shard name/type should be:
873  * 1) The type must be S_IFDIR
874  * 2) The name should be $FID:$index
875  * 3) the index should within valid range.
876  *
877  * \param[in] env       pointer to the thread context
878  * \param[in] name      the shard name
879  * \param[in] namelen   the name length
880  * \param[in] type      the entry's type
881  * \param[in] fid       the entry's FID
882  *
883  * \retval              zero or positive number for the index from the name
884  * \retval              negative error number on failure
885  */
886 int lfsck_shard_name_to_index(const struct lu_env *env, const char *name,
887                               int namelen, __u16 type, const struct lu_fid *fid)
888 {
889         char    *name2  = lfsck_env_info(env)->lti_tmpbuf2;
890         int      len;
891         int      idx    = 0;
892
893         if (!S_ISDIR(type))
894                 return -ENOTDIR;
895
896         LASSERT(name != name2);
897
898         len = snprintf(name2, sizeof(lfsck_env_info(env)->lti_tmpbuf2),
899                        DFID":", PFID(fid));
900         if (namelen < len + 1 || memcmp(name, name2, len) != 0)
901                 return -EINVAL;
902
903         do {
904                 if (!isdigit(name[len]))
905                         return -EINVAL;
906
907                 idx = idx * 10 + name[len++] - '0';
908         } while (len < namelen);
909
910         if (idx >= LFSCK_LMV_MAX_STRIPES)
911                 return -EINVAL;
912
913         return idx;
914 }
915
916 bool lfsck_is_valid_slave_name_entry(const struct lu_env *env,
917                                      struct lfsck_lmv *llmv,
918                                      const char *name, int namelen)
919 {
920         struct lmv_mds_md_v1    *lmv;
921         int                      idx;
922
923         if (llmv == NULL || !llmv->ll_lmv_slave || !llmv->ll_lmv_verified)
924                 return true;
925
926         lmv = &llmv->ll_lmv;
927         idx = lmv_name_to_stripe_index(lmv->lmv_hash_type,
928                                        lmv->lmv_stripe_count,
929                                        name, namelen);
930         if (unlikely(idx != lmv->lmv_master_mdt_index))
931                 return false;
932
933         return true;
934 }
935
936 /**
937  * Check whether the given name is a valid entry under the @parent.
938  *
939  * If the @parent is a striped directory then the @child should one
940  * shard of the striped directory, its name should be $FID:$index.
941  *
942  * If the @parent is a shard of a striped directory, then the name hash
943  * should match the MDT, otherwise it is invalid.
944  *
945  * \param[in] env       pointer to the thread context
946  * \param[in] parent    the parent directory
947  * \param[in] child     the child object to be checked
948  * \param[in] cname     the name for the @child in the parent directory
949  *
950  * \retval              positive number for invalid name entry
951  * \retval              0 if the name is valid or uncertain
952  * \retval              negative error number on failure
953  */
954 int lfsck_namespace_check_name(const struct lu_env *env,
955                                struct dt_object *parent,
956                                struct dt_object *child,
957                                const struct lu_name *cname)
958 {
959         struct lmv_mds_md_v1    *lmv = &lfsck_env_info(env)->lti_lmv;
960         int                      idx;
961         int                      rc;
962
963         rc = lfsck_read_stripe_lmv(env, parent, lmv);
964         if (rc != 0)
965                 RETURN(rc == -ENODATA ? 0 : rc);
966
967         if (lmv->lmv_magic == LMV_MAGIC_STRIPE) {
968                 if (!lfsck_is_valid_slave_lmv(lmv))
969                         return 0;
970
971                 idx = lmv_name_to_stripe_index(lmv->lmv_hash_type,
972                                                lmv->lmv_stripe_count,
973                                                cname->ln_name,
974                                                cname->ln_namelen);
975                 if (unlikely(idx != lmv->lmv_master_mdt_index))
976                         return 1;
977         } else if (lfsck_shard_name_to_index(env, cname->ln_name,
978                         cname->ln_namelen, lfsck_object_type(child),
979                         lfsck_dto2fid(child)) < 0) {
980                 return 1;
981         }
982
983         return 0;
984 }
985
986 /**
987  * Update the object's LMV EA with the given @lmv.
988  *
989  * \param[in] env       pointer to the thread context
990  * \param[in] com       pointer to the lfsck component
991  * \param[in] obj       pointer to the object which LMV EA will be updated
992  * \param[in] lmv       pointer to buffer holding the new LMV EA
993  * \param[in] locked    whether the caller has held ldlm lock on the @obj or not
994  *
995  * \retval              positive number for nothing to be done
996  * \retval              zero if updated successfully
997  * \retval              negative error number on failure
998  */
999 int lfsck_namespace_update_lmv(const struct lu_env *env,
1000                                struct lfsck_component *com,
1001                                struct dt_object *obj,
1002                                struct lmv_mds_md_v1 *lmv, bool locked)
1003 {
1004         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1005         struct lmv_mds_md_v1            *lmv4   = &info->lti_lmv4;
1006         struct lu_buf                   *buf    = &info->lti_buf;
1007         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1008         struct dt_device                *dev    = lfsck_obj2dt_dev(obj);
1009         struct thandle                  *th     = NULL;
1010         struct lustre_handle             lh     = { 0 };
1011         int                              rc     = 0;
1012         int                              rc1    = 0;
1013         ENTRY;
1014
1015         LASSERT(lmv4 != lmv);
1016
1017         lfsck_lmv_header_cpu_to_le(lmv4, lmv);
1018         lfsck_buf_init(buf, lmv4, sizeof(*lmv4));
1019
1020         if (!locked) {
1021                 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
1022                                       MDS_INODELOCK_UPDATE |
1023                                       MDS_INODELOCK_XATTR, LCK_EX);
1024                 if (rc != 0)
1025                         GOTO(log, rc);
1026         }
1027
1028         th = dt_trans_create(env, dev);
1029         if (IS_ERR(th))
1030                 GOTO(log, rc = PTR_ERR(th));
1031
1032         /* For remote updating LMV EA, there will be further LFSCK action on
1033          * remote MDT after the updating, so update the LMV EA synchronously. */
1034         if (dt_object_remote(obj))
1035                 th->th_sync = 1;
1036
1037         rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LMV, 0, th);
1038         if (rc != 0)
1039                 GOTO(stop, rc);
1040
1041         rc = dt_trans_start_local(env, dev, th);
1042         if (rc != 0)
1043                 GOTO(stop, rc);
1044
1045         dt_write_lock(env, obj, 0);
1046         if (unlikely(lfsck_is_dead_obj(obj)))
1047                 GOTO(unlock, rc = 1);
1048
1049         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1050                 GOTO(unlock, rc = 0);
1051
1052         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LMV, 0, th, BYPASS_CAPA);
1053
1054         GOTO(unlock, rc);
1055
1056 unlock:
1057         dt_write_unlock(env, obj);
1058
1059 stop:
1060         rc1 = dt_trans_stop(env, dev, th);
1061         if (rc == 0)
1062                 rc = rc1;
1063
1064 log:
1065         lfsck_ibits_unlock(&lh, LCK_EX);
1066         CDEBUG(D_LFSCK, "%s: namespace LFSCK updated the %s LMV EA "
1067                "for the object "DFID": rc = %d\n",
1068                lfsck_lfsck2name(lfsck),
1069                lmv->lmv_magic == LMV_MAGIC ? "master" : "slave",
1070                PFID(lfsck_dto2fid(obj)), rc);
1071
1072         return rc;
1073 }
1074
1075 /**
1076  * Check whether allow to re-genereate the lost master LMV EA.
1077  *
1078  * If the master MDT-object of the striped directory lost its master LMV EA,
1079  * then before the LFSCK repaired the striped directory, some ones may have
1080  * created some objects (that are not normal shards of the striped directory)
1081  * under the master MDT-object. If such case happend, then the LFSCK cannot
1082  * re-generate the lost master LMV EA to keep those objects to be visible to
1083  * client.
1084  *
1085  * \param[in] env       pointer to the thread context
1086  * \param[in] com       pointer to the lfsck component
1087  * \param[in] obj       pointer to the master MDT-object to be checked
1088  * \param[in] cfid      the shard's FID used for verification
1089  * \param[in] cidx      the shard's index used for verification
1090  *
1091  * \retval              positive number if not allow to re-generate LMV EA
1092  * \retval              zero if allow to re-generate LMV EA
1093  * \retval              negative error number on failure
1094  */
1095 static int lfsck_allow_regenerate_master_lmv(const struct lu_env *env,
1096                                              struct lfsck_component *com,
1097                                              struct dt_object *obj,
1098                                              const struct lu_fid *cfid,
1099                                              __u32 cidx)
1100 {
1101         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1102         struct lu_fid                   *tfid   = &info->lti_fid3;
1103         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1104         struct lu_dirent                *ent    =
1105                         (struct lu_dirent *)info->lti_key;
1106         const struct dt_it_ops          *iops;
1107         struct dt_it                    *di;
1108         __u64                            cookie;
1109         __u32                            args;
1110         int                              rc;
1111         __u16                            type;
1112         ENTRY;
1113
1114         if (unlikely(!dt_try_as_dir(env, obj)))
1115                 RETURN(-ENOTDIR);
1116
1117         /* Check whether the shard and the master MDT-object matches or not. */
1118         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf), DFID":%u",
1119                  PFID(cfid), cidx);
1120         rc = dt_lookup(env, obj, (struct dt_rec *)tfid,
1121                        (const struct dt_key *)info->lti_tmpbuf, BYPASS_CAPA);
1122         if (rc != 0)
1123                 RETURN(rc);
1124
1125         if (!lu_fid_eq(tfid, cfid))
1126                 RETURN(-ENOENT);
1127
1128         args = lfsck->li_args_dir & ~(LUDA_VERIFY | LUDA_VERIFY_DRYRUN);
1129         iops = &obj->do_index_ops->dio_it;
1130         di = iops->init(env, obj, args, BYPASS_CAPA);
1131         if (IS_ERR(di))
1132                 RETURN(PTR_ERR(di));
1133
1134         rc = iops->load(env, di, 0);
1135         if (rc == 0)
1136                 rc = iops->next(env, di);
1137         else if (rc > 0)
1138                 rc = 0;
1139
1140         if (rc != 0)
1141                 GOTO(out, rc);
1142
1143         do {
1144                 rc = iops->rec(env, di, (struct dt_rec *)ent, args);
1145                 if (rc == 0)
1146                         rc = lfsck_unpack_ent(ent, &cookie, &type);
1147
1148                 if (rc != 0)
1149                         GOTO(out, rc);
1150
1151                 /* skip dot and dotdot entries */
1152                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1153                         goto next;
1154
1155                 /* If the subdir name does not match the shard name rule, then
1156                  * it is quite possible that it is NOT a shard, but created by
1157                  * someone after the master MDT-object lost the master LMV EA.
1158                  * But it is also possible that the subdir name entry crashed,
1159                  * under such double failure cases, the LFSCK cannot know how
1160                  * to repair the inconsistency. For data safe, the LFSCK will
1161                  * mark the master MDT-object as read-only. The administrator
1162                  * can fix the bad shard name manually, then run LFSCK again.
1163                  *
1164                  * XXX: If the subdir name matches the shard name rule, but it
1165                  *      is not a real shard of the striped directory, instead,
1166                  *      it was created by someone after the master MDT-object
1167                  *      lost the LMV EA, then re-generating the master LMV EA
1168                  *      will cause such subdir to be invisible to client, and
1169                  *      if its index occupies some lost shard index, then the
1170                  *      LFSCK will use it to replace the bad shard, and cause
1171                  *      the subdir (itself) to be invisible for ever. */
1172                 if (lfsck_shard_name_to_index(env, ent->lde_name,
1173                                 ent->lde_namelen, type, &ent->lde_fid) < 0)
1174                         GOTO(out, rc = 1);
1175
1176 next:
1177                 rc = iops->next(env, di);
1178         } while (rc == 0);
1179
1180         GOTO(out, rc = 0);
1181
1182 out:
1183         iops->put(env, di);
1184         iops->fini(env, di);
1185
1186         return rc;
1187 }
1188
1189 /**
1190  * Notify remote LFSCK instance that the object's LMV EA has been updated.
1191  *
1192  * \param[in] env       pointer to the thread context
1193  * \param[in] com       pointer to the lfsck component
1194  * \param[in] obj       pointer to the object on which the LMV EA will be set
1195  * \param[in] event     indicate either master or slave LMV EA has been updated
1196  * \param[in] flags     indicate which element(s) in the LMV EA has been updated
1197  * \param[in] index     the MDT index on which the LFSCK instance to be notified
1198  *
1199  * \retval              positive number if nothing to be done
1200  * \retval              zero for succeed
1201  * \retval              negative error number on failure
1202  */
1203 static int lfsck_namespace_notify_lmv_remote(const struct lu_env *env,
1204                                              struct lfsck_component *com,
1205                                              struct dt_object *obj,
1206                                              __u32 event, __u32 flags,
1207                                              __u32 index)
1208 {
1209         struct lfsck_request            *lr     = &lfsck_env_info(env)->lti_lr;
1210         const struct lu_fid             *fid    = lfsck_dto2fid(obj);
1211         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1212         struct lfsck_tgt_desc           *ltd    = NULL;
1213         struct ptlrpc_request           *req    = NULL;
1214         int                              rc;
1215         ENTRY;
1216
1217         ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, index);
1218         if (ltd == NULL)
1219                 GOTO(out, rc = -ENODEV);
1220
1221         req = ptlrpc_request_alloc(class_exp2cliimp(ltd->ltd_exp),
1222                                    &RQF_LFSCK_NOTIFY);
1223         if (req == NULL)
1224                 GOTO(out, rc = -ENOMEM);
1225
1226         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
1227         if (rc != 0) {
1228                 ptlrpc_request_free(req);
1229
1230                 GOTO(out, rc);
1231         }
1232
1233         lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1234         memset(lr, 0, sizeof(*lr));
1235         lr->lr_event = event;
1236         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1237         lr->lr_active = LFSCK_TYPE_NAMESPACE;
1238         lr->lr_fid = *fid;
1239         lr->lr_flags = flags;
1240
1241         ptlrpc_request_set_replen(req);
1242         rc = ptlrpc_queue_wait(req);
1243         ptlrpc_req_finished(req);
1244
1245         GOTO(out, rc = (rc == -ENOENT ? 1 : rc));
1246
1247 out:
1248         CDEBUG(D_LFSCK, "%s: namespace LFSCK notify LMV EA updated for the "
1249                "object "DFID" on MDT %x remotely with event %u, flags %u: "
1250                "rc = %d\n", lfsck_lfsck2name(lfsck), PFID(fid), index,
1251                event, flags, rc);
1252
1253         if (ltd != NULL)
1254                 lfsck_tgt_put(ltd);
1255
1256         return rc;
1257 }
1258
1259 /**
1260  * Generate request for local LFSCK instance to rescan the striped directory.
1261  *
1262  * \param[in] env       pointer to the thread context
1263  * \param[in] com       pointer to the lfsck component
1264  * \param[in] obj       pointer to the striped directory to be rescanned
1265  *
1266  * \retval              positive number if nothing to be done
1267  * \retval              zero for succeed
1268  * \retval              negative error number on failure
1269  */
1270 int lfsck_namespace_notify_lmv_master_local(const struct lu_env *env,
1271                                             struct lfsck_component *com,
1272                                             struct dt_object *obj)
1273 {
1274         struct lfsck_instance      *lfsck = com->lc_lfsck;
1275         struct lfsck_namespace     *ns    = com->lc_file_ram;
1276         struct lmv_mds_md_v1       *lmv4  = &lfsck_env_info(env)->lti_lmv4;
1277         struct lfsck_lmv_unit      *llu;
1278         struct lfsck_lmv           *llmv;
1279         struct lfsck_slave_lmv_rec *lslr;
1280         int                         count = 0;
1281         int                         rc;
1282         ENTRY;
1283
1284         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1285                 RETURN(0);
1286
1287         rc = lfsck_read_stripe_lmv(env, obj, lmv4);
1288         if (rc != 0)
1289                 RETURN(rc);
1290
1291         OBD_ALLOC_PTR(llu);
1292         if (unlikely(llu == NULL))
1293                 RETURN(-ENOMEM);
1294
1295         if (lmv4->lmv_stripe_count < 1)
1296                 count = LFSCK_LMV_DEF_STRIPES;
1297         else if (lmv4->lmv_stripe_count > LFSCK_LMV_MAX_STRIPES)
1298                 count = LFSCK_LMV_MAX_STRIPES;
1299         else
1300                 count = lmv4->lmv_stripe_count;
1301
1302         OBD_ALLOC_LARGE(lslr, sizeof(struct lfsck_slave_lmv_rec) * count);
1303         if (lslr == NULL) {
1304                 OBD_FREE_PTR(llu);
1305
1306                 RETURN(-ENOMEM);
1307         }
1308
1309         INIT_LIST_HEAD(&llu->llu_link);
1310         llu->llu_lfsck = lfsck;
1311         llu->llu_obj = lfsck_object_get(obj);
1312         llmv = &llu->llu_lmv;
1313         llmv->ll_lmv_master = 1;
1314         llmv->ll_inline = 1;
1315         atomic_set(&llmv->ll_ref, 1);
1316         llmv->ll_stripes_allocated = count;
1317         llmv->ll_hash_type = LMV_HASH_TYPE_UNKNOWN;
1318         llmv->ll_lslr = lslr;
1319         llmv->ll_lmv = *lmv4;
1320
1321         down_write(&com->lc_sem);
1322         if (ns->ln_status != LS_SCANNING_PHASE1 &&
1323             ns->ln_status != LS_SCANNING_PHASE2) {
1324                 ns->ln_striped_dirs_skipped++;
1325                 up_write(&com->lc_sem);
1326                 lfsck_lmv_put(env, llmv);
1327         } else {
1328                 ns->ln_striped_dirs_repaired++;
1329                 spin_lock(&lfsck->li_lock);
1330                 list_add_tail(&llu->llu_link, &lfsck->li_list_lmv);
1331                 spin_unlock(&lfsck->li_lock);
1332                 up_write(&com->lc_sem);
1333         }
1334
1335         RETURN(0);
1336 }
1337
1338 /**
1339  * Set master LMV EA for the specified striped directory.
1340  *
1341  * First, if the master MDT-object of a striped directory lost its LMV EA,
1342  * then there may be some users have created some files under the master
1343  * MDT-object directly. Under such case, the LFSCK cannot re-generate LMV
1344  * EA for the master MDT-object, because we should keep the existing files
1345  * to be visible to client. Then the LFSCK will mark the striped directory
1346  * as read-only and keep it there to be handled by administrator manually.
1347  *
1348  * If nobody has created files under the master MDT-object of the striped
1349  * directory, then we will set the master LMV EA and generate a new rescan
1350  * (the striped directory) request that will be handled later by the LFSCK
1351  * instance on the MDT later.
1352  *
1353  * \param[in] env       pointer to the thread context
1354  * \param[in] com       pointer to the lfsck component
1355  * \param[in] dir       pointer to the object on which the LMV EA will be set
1356  * \param[in] lmv       pointer to the buffer holding the new LMV EA
1357  * \param[in] cfid      the shard's FID used for verification
1358  * \param[in] cidx      the shard's index used for verification
1359  * \param[in] flags     to indicate which element(s) in the LMV EA will be set
1360  *
1361  * \retval              positive number if nothing to be done
1362  * \retval              zero for succeed
1363  * \retval              negative error number on failure
1364  */
1365 static int lfsck_namespace_set_lmv_master(const struct lu_env *env,
1366                                           struct lfsck_component *com,
1367                                           struct dt_object *dir,
1368                                           struct lmv_mds_md_v1 *lmv,
1369                                           const struct lu_fid *cfid,
1370                                           __u32 cidx, __u32 flags)
1371 {
1372         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1373         struct lmv_mds_md_v1            *lmv3   = &info->lti_lmv3;
1374         struct lu_seq_range             *range  = &info->lti_range;
1375         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1376         struct seq_server_site          *ss     =
1377                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
1378         struct dt_object                *obj;
1379         struct lustre_handle             lh     = { 0 };
1380         int                              pidx   = -1;
1381         int                              rc     = 0;
1382         ENTRY;
1383
1384         /* Find the bottom object to bypass LOD when set LMV EA. */
1385         obj = lu2dt(container_of0(dir->do_lu.lo_header->loh_layers.prev,
1386                                   struct lu_object, lo_linkage));
1387         if (unlikely(obj == NULL))
1388                 RETURN(-ENOENT);
1389
1390         fld_range_set_mdt(range);
1391         rc = fld_server_lookup(env, ss->ss_server_fld,
1392                                fid_seq(lfsck_dto2fid(obj)), range);
1393         if (rc != 0)
1394                 GOTO(log, rc);
1395
1396         pidx = range->lsr_index;
1397         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
1398                               MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1399                               LCK_EX);
1400         if (rc != 0)
1401                 GOTO(log, rc);
1402
1403         rc = lfsck_read_stripe_lmv(env, obj, lmv3);
1404         if (rc == -ENODATA) {
1405                 if (!(flags & LEF_SET_LMV_ALL))
1406                         GOTO(log, rc);
1407
1408                 *lmv3 = *lmv;
1409         } else if (rc == 0) {
1410                 if (flags & LEF_SET_LMV_ALL)
1411                         GOTO(log, rc = 1);
1412
1413                 if (flags & LEF_SET_LMV_HASH)
1414                         lmv3->lmv_hash_type = lmv->lmv_hash_type;
1415         } else {
1416                 GOTO(log, rc);
1417         }
1418
1419         lmv3->lmv_magic = LMV_MAGIC;
1420         lmv3->lmv_master_mdt_index = pidx;
1421
1422         if (flags & LEF_SET_LMV_ALL) {
1423                 rc = lfsck_allow_regenerate_master_lmv(env, com, obj,
1424                                                        cfid, cidx);
1425                 if (rc > 0) {
1426                         rc = lfsck_disable_master_lmv(env, com, obj, false);
1427
1428                         GOTO(log, rc = (rc == 0 ? 1 : rc));
1429                 }
1430
1431                 if (rc < 0)
1432                         GOTO(log, rc);
1433
1434                 /* To indicate that the master has ever lost LMV EA. */
1435                 lmv3->lmv_hash_type |= LMV_HASH_FLAG_LOST_LMV;
1436         }
1437
1438         rc = lfsck_namespace_update_lmv(env, com, obj, lmv3, true);
1439         if (rc == 0 && flags & LEF_SET_LMV_ALL) {
1440                 if (dt_object_remote(obj))
1441                         rc = lfsck_namespace_notify_lmv_remote(env, com, obj,
1442                                                 LE_SET_LMV_MASTER, 0, pidx);
1443                 else
1444                         rc = lfsck_namespace_notify_lmv_master_local(env, com,
1445                                                                      obj);
1446         }
1447
1448         GOTO(log, rc);
1449
1450 log:
1451         lfsck_ibits_unlock(&lh, LCK_EX);
1452         CDEBUG(D_LFSCK, "%s: namespace LFSCK set master LMV EA for the object "
1453                DFID" on the %s MDT %d, flags %x: rc = %d\n",
1454                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)),
1455                dt_object_remote(obj) ? "remote" : "local", pidx, flags, rc);
1456
1457         if (rc <= 0) {
1458                 struct lfsck_namespace *ns = com->lc_file_ram;
1459
1460                 ns->ln_flags |= LF_INCONSISTENT;
1461         }
1462
1463         return rc;
1464 }
1465
1466 /**
1467  * Repair the bad name hash.
1468  *
1469  * If the name hash of some name entry under the striped directory does not
1470  * match the shard of the striped directory, then the LFSCK will repair the
1471  * inconsistency. Ideally, the LFSCK should migrate the name entry from the
1472  * current MDT to the right MDT (another one), but before the async commit
1473  * finished, the LFSCK will change the striped directory's hash type as
1474  * LMV_HASH_TYPE_UNKNOWN and mark the lmv flags as LMV_HASH_FLAG_BAD_TYPE.
1475  *
1476  * \param[in] env       pointer to the thread context
1477  * \param[in] com       pointer to the lfsck component
1478  * \param[in] shard     pointer to the shard of the striped directory that
1479  *                      contains the bad name entry
1480  * \param[in] llmv      pointer to lfsck LMV EA structure
1481  * \param[in] name      the name of the bad name hash
1482  *
1483  * \retval              positive number if nothing to be done
1484  * \retval              zero for succeed
1485  * \retval              negative error number on failure
1486  */
1487 int lfsck_namespace_repair_bad_name_hash(const struct lu_env *env,
1488                                          struct lfsck_component *com,
1489                                          struct dt_object *shard,
1490                                          struct lfsck_lmv *llmv,
1491                                          const char *name)
1492 {
1493         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1494         struct lu_fid                   *pfid   = &info->lti_fid3;
1495         struct lmv_mds_md_v1            *lmv2   = &info->lti_lmv2;
1496         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1497         struct dt_object                *parent = NULL;
1498         int                              rc     = 0;
1499         ENTRY;
1500
1501         rc = dt_lookup(env, shard, (struct dt_rec *)pfid,
1502                        (const struct dt_key *)dotdot, BYPASS_CAPA);
1503         if (rc != 0 || !fid_is_sane(pfid))
1504                 GOTO(log, rc);
1505
1506         parent = lfsck_object_find_bottom(env, lfsck, pfid);
1507         if (IS_ERR(parent))
1508                 GOTO(log, rc = PTR_ERR(parent));
1509
1510         *lmv2 = llmv->ll_lmv;
1511         lmv2->lmv_hash_type = LMV_HASH_TYPE_UNKNOWN | LMV_HASH_FLAG_BAD_TYPE;
1512         rc = lfsck_namespace_set_lmv_master(env, com, parent, lmv2,
1513                                             lfsck_dto2fid(shard),
1514                                             llmv->ll_lmv.lmv_master_mdt_index,
1515                                             LEF_SET_LMV_HASH);
1516
1517         GOTO(log, rc);
1518
1519 log:
1520         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found bad name hash "
1521                "on the MDT %x, parent "DFID", name %s, shard_%x "DFID
1522                ": rc = %d\n",
1523                lfsck_lfsck2name(lfsck), lfsck_dev_idx(lfsck->li_bottom),
1524                PFID(pfid), name, llmv->ll_lmv.lmv_master_mdt_index,
1525                PFID(lfsck_dto2fid(shard)), rc);
1526
1527         if (parent != NULL && !IS_ERR(parent))
1528                 lfsck_object_put(env, parent);
1529
1530         return rc;
1531 }
1532
1533 /**
1534  * Scan the shard of a striped directory for name hash verification.
1535  *
1536  * During the first-stage scanning, if the LFSCK cannot make sure whether
1537  * the shard of a stripe directory contains valid slave LMV EA or not, then
1538  * it will skip the name hash verification for this shard temporarily, and
1539  * record the shard's FID in the LFSCK tracing file. As the LFSCK processing,
1540  * the slave LMV EA may has been verified/fixed by LFSCK instance on master.
1541  * Then in the second-stage scanning, the shard will be re-scanned, and for
1542  * every name entry under the shard, the name hash will be verified, and for
1543  * unmatched name entry, the LFSCK will try to fix it.
1544  *
1545  * \param[in] env       pointer to the thread context
1546  * \param[in] com       pointer to the lfsck component
1547  * \param[in] child     pointer to the directory object to be handled
1548  *
1549  * \retval              positive number for scanning successfully
1550  * \retval              zero for the scanning is paused
1551  * \retval              negative error number on failure
1552  */
1553 int lfsck_namespace_scan_shard(const struct lu_env *env,
1554                                struct lfsck_component *com,
1555                                struct dt_object *child)
1556 {
1557         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1558         struct lmv_mds_md_v1            *lmv    = &info->lti_lmv;
1559         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1560         struct lfsck_namespace          *ns     = com->lc_file_ram;
1561         struct ptlrpc_thread            *thread = &lfsck->li_thread;
1562         struct lu_dirent                *ent    =
1563                         (struct lu_dirent *)info->lti_key;
1564         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
1565         struct lfsck_lmv                *llmv   = NULL;
1566         const struct dt_it_ops          *iops;
1567         struct dt_it                    *di;
1568         __u64                            cookie;
1569         __u32                            args;
1570         int                              rc;
1571         __u16                            type;
1572         ENTRY;
1573
1574         rc = lfsck_read_stripe_lmv(env, child, lmv);
1575         if (rc != 0)
1576                 RETURN(rc == -ENODATA ? 1 : rc);
1577
1578         if (lmv->lmv_magic != LMV_MAGIC_STRIPE)
1579                 RETURN(1);
1580
1581         if (unlikely(!dt_try_as_dir(env, child)))
1582                 RETURN(-ENOTDIR);
1583
1584         OBD_ALLOC_PTR(llmv);
1585         if (llmv == NULL)
1586                 RETURN(-ENOMEM);
1587
1588         llmv->ll_lmv_slave = 1;
1589         llmv->ll_lmv_verified = 1;
1590         llmv->ll_lmv = *lmv;
1591         atomic_set(&llmv->ll_ref, 1);
1592
1593         args = lfsck->li_args_dir & ~(LUDA_VERIFY | LUDA_VERIFY_DRYRUN);
1594         iops = &child->do_index_ops->dio_it;
1595         di = iops->init(env, child, args, BYPASS_CAPA);
1596         if (IS_ERR(di))
1597                 GOTO(out, rc = PTR_ERR(di));
1598
1599         rc = iops->load(env, di, 0);
1600         if (rc == 0)
1601                 rc = iops->next(env, di);
1602         else if (rc > 0)
1603                 rc = 0;
1604
1605         while (rc == 0) {
1606                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
1607                     cfs_fail_val > 0) {
1608                         struct l_wait_info lwi;
1609
1610                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
1611                                           NULL, NULL);
1612                         l_wait_event(thread->t_ctl_waitq,
1613                                      !thread_is_running(thread),
1614                                      &lwi);
1615
1616                         if (unlikely(!thread_is_running(thread)))
1617                                 GOTO(out, rc = 0);
1618                 }
1619
1620                 rc = iops->rec(env, di, (struct dt_rec *)ent, args);
1621                 if (rc == 0)
1622                         rc = lfsck_unpack_ent(ent, &cookie, &type);
1623
1624                 if (rc != 0) {
1625                         if (bk->lb_param & LPF_FAILOUT)
1626                                 GOTO(out, rc);
1627
1628                         goto next;
1629                 }
1630
1631                 /* skip dot and dotdot entries */
1632                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1633                         goto next;
1634
1635                 if (!lfsck_is_valid_slave_name_entry(env, llmv, ent->lde_name,
1636                                                      ent->lde_namelen)) {
1637                         ns->ln_flags |= LF_INCONSISTENT;
1638                         rc = lfsck_namespace_repair_bad_name_hash(env, com,
1639                                                 child, llmv, ent->lde_name);
1640                         if (rc >= 0)
1641                                 ns->ln_name_hash_repaired++;
1642                 }
1643
1644                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
1645                         GOTO(out, rc);
1646
1647                 /* Rate control. */
1648                 lfsck_control_speed(lfsck);
1649                 if (unlikely(!thread_is_running(thread)))
1650                         GOTO(out, rc = 0);
1651
1652                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
1653                         spin_lock(&lfsck->li_lock);
1654                         thread_set_flags(thread, SVC_STOPPING);
1655                         spin_unlock(&lfsck->li_lock);
1656
1657                         GOTO(out, rc = -EINVAL);
1658                 }
1659
1660 next:
1661                 rc = iops->next(env, di);
1662         }
1663
1664         GOTO(out, rc);
1665
1666 out:
1667         iops->put(env, di);
1668         iops->fini(env, di);
1669         lfsck_lmv_put(env, llmv);
1670
1671         return rc;
1672 }
1673
1674 /**
1675  * Verify the slave object's (of striped directory) LMV EA.
1676  *
1677  * For the slave object of a striped directory, before traversing the shard
1678  * the LFSCK will verify whether its slave LMV EA matches its parent's master
1679  * LMV EA or not.
1680  *
1681  * \param[in] env       pointer to the thread context
1682  * \param[in] com       pointer to the lfsck component
1683  * \param[in] obj       pointer to the object which LMV EA will be checked
1684  * \param[in] llmv      pointer to buffer holding the slave LMV EA
1685  *
1686  * \retval              zero for succeed
1687  * \retval              negative error number on failure
1688  */
1689 int lfsck_namespace_verify_stripe_slave(const struct lu_env *env,
1690                                         struct lfsck_component *com,
1691                                         struct dt_object *obj,
1692                                         struct lfsck_lmv *llmv)
1693 {
1694         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1695         char                            *name   = info->lti_key;
1696         char                            *name2;
1697         struct lu_fid                   *pfid   = &info->lti_fid3;
1698         struct lu_fid                   *tfid   = &info->lti_fid4;
1699         const struct lu_fid             *cfid   = lfsck_dto2fid(obj);
1700         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1701         struct lmv_mds_md_v1            *clmv   = &llmv->ll_lmv;
1702         struct lmv_mds_md_v1            *plmv   = &info->lti_lmv;
1703         struct dt_object                *parent = NULL;
1704         int                              rc     = 0;
1705         ENTRY;
1706
1707         if (!lfsck_is_valid_slave_lmv(clmv)) {
1708                 rc = lfsck_namespace_trace_update(env, com, cfid,
1709                                         LNTF_UNCERTAIN_LMV, true);
1710
1711                 GOTO(out, rc);
1712         }
1713
1714         rc = dt_lookup(env, obj, (struct dt_rec *)pfid,
1715                        (const struct dt_key *)dotdot, BYPASS_CAPA);
1716         if (rc != 0 || !fid_is_sane(pfid)) {
1717                 rc = lfsck_namespace_trace_update(env, com, cfid,
1718                                         LNTF_UNCERTAIN_LMV, true);
1719
1720                 GOTO(out, rc);
1721         }
1722
1723         parent = lfsck_object_find(env, lfsck, pfid);
1724         if (IS_ERR(parent)) {
1725                 rc = lfsck_namespace_trace_update(env, com, cfid,
1726                                         LNTF_UNCERTAIN_LMV, true);
1727
1728                 GOTO(out, rc);
1729         }
1730
1731         if (unlikely(!dt_try_as_dir(env, parent)))
1732                 GOTO(out, rc = -ENOTDIR);
1733
1734         rc = lfsck_read_stripe_lmv(env, parent, plmv);
1735         if (rc != 0) {
1736                 int rc1;
1737
1738                 /* If the parent has no LMV EA, then it maybe because:
1739                  * 1) The parent lost the LMV EA.
1740                  * 2) The child claims a wrong (slave) LMV EA. */
1741                 if (rc == -ENODATA)
1742                         rc = lfsck_namespace_set_lmv_master(env, com, parent,
1743                                         clmv, cfid, clmv->lmv_master_mdt_index,
1744                                         LEF_SET_LMV_ALL);
1745                 else
1746                         rc = 0;
1747
1748                 rc1 = lfsck_namespace_trace_update(env, com, cfid,
1749                                                    LNTF_UNCERTAIN_LMV, true);
1750
1751                 GOTO(out, rc = (rc < 0 ? rc : rc1));
1752         }
1753
1754         /* Unmatched magic or stripe count. */
1755         if (unlikely(plmv->lmv_magic != LMV_MAGIC ||
1756                      plmv->lmv_stripe_count != clmv->lmv_stripe_count)) {
1757                 rc = lfsck_namespace_trace_update(env, com, cfid,
1758                                                   LNTF_UNCERTAIN_LMV, true);
1759
1760                 GOTO(out, rc);
1761         }
1762
1763         /* If the master hash type has been set as LMV_HASH_TYPE_UNKNOWN,
1764          * then the slave hash type is not important. */
1765         if ((plmv->lmv_hash_type & LMV_HASH_TYPE_MASK) ==
1766             LMV_HASH_TYPE_UNKNOWN &&
1767             plmv->lmv_hash_type & LMV_HASH_FLAG_BAD_TYPE)
1768                 GOTO(out, rc = 0);
1769
1770         /* Unmatched hash type. */
1771         if (unlikely((plmv->lmv_hash_type & LMV_HASH_TYPE_MASK) !=
1772                      (clmv->lmv_hash_type & LMV_HASH_TYPE_MASK))) {
1773                 rc = lfsck_namespace_trace_update(env, com, cfid,
1774                                                   LNTF_UNCERTAIN_LMV, true);
1775
1776                 GOTO(out, rc);
1777         }
1778
1779         snprintf(info->lti_tmpbuf2, sizeof(info->lti_tmpbuf2), DFID":%u",
1780                  PFID(cfid), clmv->lmv_master_mdt_index);
1781         name2 = info->lti_tmpbuf2;
1782
1783         rc = lfsck_links_get_first(env, obj, name, tfid);
1784         if (rc == 0 && strcmp(name, name2) == 0 && lu_fid_eq(pfid, tfid)) {
1785                 llmv->ll_lmv_verified = 1;
1786
1787                 GOTO(out, rc);
1788         }
1789
1790         rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
1791                        (const struct dt_key *)name2, BYPASS_CAPA);
1792         if (rc != 0 || !lu_fid_eq(cfid, tfid))
1793                 rc = lfsck_namespace_trace_update(env, com, cfid,
1794                                                   LNTF_UNCERTAIN_LMV, true);
1795         else
1796                 llmv->ll_lmv_verified = 1;
1797
1798         GOTO(out, rc);
1799
1800 out:
1801         if (parent != NULL && !IS_ERR(parent))
1802                 lfsck_object_put(env, parent);
1803
1804         return rc;
1805 }
1806
1807 /**
1808  * Double scan the striped directory or the shard.
1809  *
1810  * All the shards' under the given striped directory or its shard have
1811  * been scanned, the LFSCK has got the global knownledge about the LMV
1812  * EA consistency.
1813  *
1814  * If the target is one shard of a striped directory, then only needs to
1815  * update related tracing file.
1816  *
1817  * If the target is the master MDT-object of a striped directory, then the
1818  * LFSCK will make the decision about whether the master LMV EA is invalid
1819  * or not, and repair it if inconsistenct; for every shard of the striped
1820  * directory, whether the slave LMV EA is invalid or not, and repair it if
1821  * inconsistent.
1822  *
1823  * \param[in] env       pointer to the thread context
1824  * \param[in] com       pointer to the lfsck component
1825  * \param[in] lnr       pointer to the namespace request that contains the
1826  *                      striped directory or the shard
1827  *
1828  * \retval              zero for succeed
1829  * \retval              negative error number on failure
1830  */
1831 int lfsck_namespace_striped_dir_rescan(const struct lu_env *env,
1832                                        struct lfsck_component *com,
1833                                        struct lfsck_namespace_req *lnr)
1834 {
1835         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1836         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1837         struct lfsck_namespace          *ns     = com->lc_file_ram;
1838         struct lfsck_lmv                *llmv   = lnr->lnr_lmv;
1839         struct lmv_mds_md_v1            *lmv    = &llmv->ll_lmv;
1840         struct lmv_mds_md_v1            *lmv2   = &info->lti_lmv2;
1841         struct dt_object                *dir    = lnr->lnr_obj;
1842         const struct lu_fid             *pfid   = lfsck_dto2fid(dir);
1843         struct lu_seq_range             *range  = &info->lti_range;
1844         struct seq_server_site          *ss     =
1845                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
1846         __u32                            stripe_count;
1847         __u32                            hash_type;
1848         int                              rc     = 0;
1849         int                              i;
1850         ENTRY;
1851
1852         if (llmv->ll_lmv_slave) {
1853                 if (llmv->ll_lmv_verified) {
1854                         ns->ln_striped_shards_scanned++;
1855                         lfsck_namespace_trace_update(env, com,
1856                                         lfsck_dto2fid(dir),
1857                                         LNTF_UNCERTAIN_LMV |
1858                                         LNTF_RECHECK_NAME_HASH, false);
1859                 }
1860
1861                 RETURN(0);
1862         }
1863
1864         /* Either the striped directory has been disabled or only part of
1865          * the striped directory have been scanned. The LFSCK cannot repair
1866          * something based on incompleted knowledge. So skip it. */
1867         if (llmv->ll_ignore || llmv->ll_exit_value <= 0)
1868                 RETURN(0);
1869
1870         /* There ever been some failure, as to the LFSCK cannot know whether
1871          * it has got the global knowledge about the LMV EA consistency or not,
1872          * so it cannot make reparation about the incompleted knowledge. */
1873         if (llmv->ll_failed) {
1874                 ns->ln_striped_dirs_scanned++;
1875                 ns->ln_striped_dirs_failed++;
1876
1877                 RETURN(0);
1878         }
1879
1880         if (lmv->lmv_stripe_count > LFSCK_LMV_MAX_STRIPES)
1881                 stripe_count = max(llmv->ll_max_filled_off + 1,
1882                                    llmv->ll_max_stripe_count);
1883         else
1884                 stripe_count = max(llmv->ll_max_filled_off + 1,
1885                                    lmv->lmv_stripe_count);
1886
1887         if (lmv->lmv_stripe_count != stripe_count) {
1888                 lmv->lmv_stripe_count = stripe_count;
1889                 llmv->ll_lmv_updated = 1;
1890         }
1891
1892         if (!lmv_is_known_hash_type(lmv->lmv_hash_type) &&
1893             !(lmv->lmv_hash_type & LMV_HASH_FLAG_BAD_TYPE) &&
1894             lmv_is_known_hash_type(llmv->ll_hash_type)) {
1895                 hash_type = llmv->ll_hash_type & LMV_HASH_TYPE_MASK;
1896                 lmv->lmv_hash_type = llmv->ll_hash_type;
1897                 llmv->ll_lmv_updated = 1;
1898         } else {
1899                 hash_type = lmv->lmv_hash_type & LMV_HASH_TYPE_MASK;
1900                 if (!lmv_is_known_hash_type(hash_type))
1901                         hash_type = LMV_HASH_TYPE_UNKNOWN;
1902         }
1903
1904         if (llmv->ll_lmv_updated) {
1905                 lmv->lmv_layout_version++;
1906                 rc = lfsck_namespace_update_lmv(env, com, dir, lmv, false);
1907                 if (rc != 0)
1908                         RETURN(rc);
1909
1910                 ns->ln_striped_dirs_scanned++;
1911                 ns->ln_striped_dirs_repaired++;
1912         }
1913
1914         fld_range_set_mdt(range);
1915         for (i = 0; i <= llmv->ll_max_filled_off; i++) {
1916                 struct dt_object *obj = NULL;
1917                 struct lfsck_slave_lmv_rec *lslr = llmv->ll_lslr + i;
1918                 const struct lu_fid *cfid = &lslr->lslr_fid;
1919                 const struct lu_name *cname;
1920                 struct linkea_data ldata = { NULL };
1921                 int len;
1922                 int rc1 = 0;
1923                 bool repair_linkea = false;
1924                 bool repair_lmvea = false;
1925                 bool rename = false;
1926                 bool create = false;
1927                 bool linkea_repaired = false;
1928                 bool lmvea_repaired = false;
1929                 bool rename_repaired = false;
1930                 bool create_repaired = false;
1931
1932                 /* LMV EA hole. */
1933                 if (fid_is_zero(cfid))
1934                         continue;
1935
1936                 len = snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
1937                                DFID":%u", PFID(cfid), i);
1938                 cname = lfsck_name_get_const(env, info->lti_tmpbuf, len);
1939                 memcpy(lnr->lnr_name, info->lti_tmpbuf, len);
1940
1941                 obj = lfsck_object_find_bottom_nowait(env, lfsck, cfid);
1942                 if (IS_ERR(obj)) {
1943                         if (lfsck_is_dead_obj(dir))
1944                                 RETURN(0);
1945
1946                         rc1 = PTR_ERR(obj);
1947                         goto next;
1948                 }
1949
1950                 switch (lslr->lslr_flags) {
1951                 case LSLF_NONE:
1952                         if (llmv->ll_inline ||
1953                             lslr->lslr_stripe_count != stripe_count ||
1954                             (lslr->lslr_hash_type & LMV_HASH_TYPE_MASK) !=
1955                              hash_type)
1956                                 repair_lmvea = true;
1957                         break;
1958                 case LSLF_BAD_INDEX2:
1959                         /* The index in the slave LMV EA is right,
1960                          * the name entry should be updated. */
1961                         rename = true;
1962                         snprintf(info->lti_tmpbuf2, sizeof(info->lti_tmpbuf2),
1963                                  DFID":%u", PFID(cfid), lslr->lslr_index);
1964                         if (llmv->ll_inline ||
1965                             lslr->lslr_stripe_count != stripe_count ||
1966                             (lslr->lslr_hash_type & LMV_HASH_TYPE_MASK) !=
1967                              hash_type)
1968                                 repair_lmvea = true;
1969                         break;
1970                 case LSLF_BAD_INDEX1:
1971                         /* The index in the name entry is right,
1972                          * the slave LMV EA should be updated. */
1973                 case LSLF_NO_LMVEA:
1974                         repair_lmvea = true;
1975                         break;
1976                 case LSLF_DANGLING:
1977                         create = true;
1978                         goto repair;
1979                 default:
1980                         break;
1981                 }
1982
1983                 rc1 = lfsck_links_read(env, obj, &ldata);
1984                 if (rc1 == -ENOENT) {
1985                         create = true;
1986                         goto repair;
1987                 }
1988
1989                 if (rc1 == -EINVAL || rc1 == -ENODATA) {
1990                         repair_linkea = true;
1991                         goto repair;
1992                 }
1993
1994                 if (rc1 != 0)
1995                         goto next;
1996
1997                 if (ldata.ld_leh->leh_reccount != 1) {
1998                         repair_linkea = true;
1999                         goto repair;
2000                 }
2001
2002                 rc1 = linkea_links_find(&ldata, cname, pfid);
2003                 if (rc1 != 0)
2004                         repair_linkea = true;
2005
2006 repair:
2007                 if (create) {
2008                         rc1 = lfsck_namespace_repair_dangling(env, com,
2009                                                               obj, lnr);
2010                         if (rc1 >= 0) {
2011                                 create_repaired = true;
2012                                 if (rc == 0)
2013                                         ns->ln_dangling_repaired++;
2014                         }
2015                 }
2016
2017                 if (repair_lmvea) {
2018                         *lmv2 = *lmv;
2019                         lmv2->lmv_magic = LMV_MAGIC_STRIPE;
2020                         lmv2->lmv_stripe_count = stripe_count;
2021                         lmv2->lmv_master_mdt_index = i;
2022                         lmv2->lmv_hash_type = hash_type;
2023
2024                         rc1 = lfsck_namespace_update_lmv(env, com, obj,
2025                                                          lmv2, false);
2026                         if (rc1 < 0)
2027                                 goto next;
2028
2029                         if (dt_object_remote(obj)) {
2030                                 rc1 = fld_server_lookup(env, ss->ss_server_fld,
2031                                         fid_seq(lfsck_dto2fid(obj)), range);
2032                                 if (rc1 != 0)
2033                                         goto next;
2034
2035                                 rc1 = lfsck_namespace_notify_lmv_remote(env,
2036                                                 com, obj, LE_SET_LMV_SLAVE, 0,
2037                                                 range->lsr_index);
2038                         } else {
2039                                 ns->ln_striped_shards_repaired++;
2040                                 rc1 = lfsck_namespace_trace_update(env, com,
2041                                         cfid, LNTF_RECHECK_NAME_HASH, true);
2042                         }
2043
2044                         if (rc1 < 0)
2045                                 goto next;
2046
2047                         if (rc1 >= 0)
2048                                 lmvea_repaired = true;
2049                 } else if (llmv->ll_inline) {
2050                         if (dt_object_remote(obj)) {
2051                                 rc1 = fld_server_lookup(env, ss->ss_server_fld,
2052                                         fid_seq(lfsck_dto2fid(obj)), range);
2053                                 if (rc1 != 0)
2054                                         goto next;
2055
2056                                 /* The slave LMV EA on the remote shard is
2057                                  * correct, just notify the LFSCK instance
2058                                  * on such MDT to re-verify the name_hash. */
2059                                 rc1 = lfsck_namespace_notify_lmv_remote(env,
2060                                                 com, obj, LE_SET_LMV_SLAVE,
2061                                                 LEF_RECHECK_NAME_HASH,
2062                                                 range->lsr_index);
2063                         } else {
2064                                 rc1 = lfsck_namespace_trace_update(env, com,
2065                                         cfid, LNTF_RECHECK_NAME_HASH, true);
2066                         }
2067
2068                         if (rc1 < 0)
2069                                 goto next;
2070                 }
2071
2072                 if (rename) {
2073                         rc1 = lfsck_namespace_repair_dirent(env, com, dir, obj,
2074                                         info->lti_tmpbuf2, lnr->lnr_name,
2075                                         lnr->lnr_type, true, false);
2076                         if (rc1 >= 0) {
2077                                 rename_repaired = true;
2078                                 if (rc1 > 0) {
2079                                         ns->ln_dirent_repaired++;
2080                                         rc1 = lfsck_namespace_trace_update(env,
2081                                                 com, cfid,
2082                                                 LNTF_RECHECK_NAME_HASH, true);
2083                                 }
2084                         }
2085
2086                         if (rc1 < 0)
2087                                 goto next;
2088                 }
2089
2090                 if (repair_linkea) {
2091                         struct lustre_handle lh = { 0 };
2092
2093                         rc1 = linkea_data_new(&ldata, &info->lti_big_buf);
2094                         if (rc1 != 0)
2095                                 goto next;
2096
2097                         rc1 = linkea_add_buf(&ldata, cname, lfsck_dto2fid(dir));
2098                         if (rc1 != 0)
2099                                 goto next;
2100
2101                         rc1 = lfsck_ibits_lock(env, lfsck, obj, &lh,
2102                                                MDS_INODELOCK_UPDATE |
2103                                                MDS_INODELOCK_XATTR, LCK_EX);
2104                         lfsck_ibits_unlock(&lh, LCK_EX);
2105                         if (rc1 != 0)
2106                                 goto next;
2107
2108                         rc1 = lfsck_namespace_rebuild_linkea(env, com, obj,
2109                                                              &ldata);
2110                         if (rc1 >= 0) {
2111                                 linkea_repaired = true;
2112                                 if (rc1 > 0)
2113                                         ns->ln_linkea_repaired++;
2114                         }
2115                 }
2116
2117 next:
2118                 CDEBUG(D_LFSCK, "%s: namespace LFSCK repair the shard "
2119                       "%d "DFID" of the striped directory "DFID" with "
2120                       "dangling %s/%s, rename %s/%s, llinkea %s/%s, "
2121                       "repair_lmvea %s/%s: rc = %d\n", lfsck_lfsck2name(lfsck),
2122                       i, PFID(cfid), PFID(&lnr->lnr_fid),
2123                       create ? "yes" : "no", create_repaired ? "yes" : "no",
2124                       rename ? "yes" : "no", rename_repaired ? "yes" : "no",
2125                       repair_linkea ? "yes" : "no",
2126                       linkea_repaired ? "yes" : "no",
2127                       repair_lmvea ? "yes" : "no",
2128                       lmvea_repaired ? "yes" : "no", rc1);
2129
2130                 if (obj != NULL && !IS_ERR(obj))
2131                         lfsck_object_put(env, obj);
2132
2133                 if (rc1 < 0) {
2134                         rc = rc1;
2135                         ns->ln_striped_shards_failed++;
2136                 }
2137         }
2138
2139         RETURN(rc);
2140 }
2141
2142 /**
2143  * Verify the shard's name entry under the striped directory.
2144  *
2145  * Before all shards of the striped directory scanned, the LFSCK cannot
2146  * know whether the master LMV EA is valid or not, and also cannot know
2147  * how to repair an invalid shard exactly. For example, the stripe index
2148  * stored in the shard's name does not match the stripe index stored in
2149  * the slave LMV EA, then the LFSCK cannot know which one is correct.
2150  * If the LFSCK just assumed one is correct, and fixed the other, then
2151  * as the LFSCK processing, it may find that the former reparation is
2152  * wrong and have to roll back. Unfortunately, if some applications saw
2153  * the changes and made further modification based on such changes, then
2154  * the roll back is almost impossible.
2155  *
2156  * To avoid above trouble, the LFSCK will scan the master object of the
2157  * striped directory twice, that is NOT the same as normal two-stages
2158  * scanning, the double scanning the striped directory will happen both
2159  * during the first-stage scanning:
2160  *
2161  * 1) When the striped directory is opened for scanning, the LFSCK will
2162  *    iterate each shard in turn, and records its slave LMV EA in the
2163  *    lfsck_lmv::ll_lslr. In this step, if the 'shard' (may be fake
2164  *    shard) name does not match the shard naming rule, for example, it
2165  *    does not contains the shard's FID, or not contains index, then we
2166  *    can remove the bad name entry directly. But if the name is valid,
2167  *    but the shard has no slave LMV EA or the slave LMV EA does not
2168  *    match its name, then we just record related information in the
2169  *    lfsck_lmv::ll_lslr in RAM.
2170  *
2171  * 2) When all the known shards have been scanned, then the engine will
2172  *    generate a dummy request (via lfsck_namespace_close_dir) to tell
2173  *    the assistant thread that all the known shards have been scanned.
2174  *    Since the assistant has got the global knowledge about the index
2175  *    conflict, stripe count, hash type, and so on. Then the assistant
2176  *    thread will scan the lfsck_lmv::ll_lslr, and for every shard in
2177  *    the record, check and repair inconsistency.
2178  *
2179  * Generally, the stripe directory has only several shards, and there
2180  * will NOT be a lof of striped directory. So double scanning striped
2181  * directory will not much affect the LFSCK performance.
2182  *
2183  * \param[in] env       pointer to the thread context
2184  * \param[in] com       pointer to the lfsck component
2185  * \param[in] lnr       pointer to the namespace request that contains the
2186  *                      shard's name, parent object, parent's LMV, and ect.
2187  *
2188  * \retval              zero for succeed
2189  * \retval              negative error number on failure
2190  */
2191 int lfsck_namespace_handle_striped_master(const struct lu_env *env,
2192                                           struct lfsck_component *com,
2193                                           struct lfsck_namespace_req *lnr)
2194 {
2195         struct lfsck_thread_info   *info        = lfsck_env_info(env);
2196         struct lmv_mds_md_v1       *lmv         = &info->lti_lmv;
2197         struct lfsck_instance      *lfsck       = com->lc_lfsck;
2198         struct lfsck_namespace     *ns          = com->lc_file_ram;
2199         struct lfsck_lmv           *llmv        = lnr->lnr_lmv;
2200         struct dt_object           *dir         = lnr->lnr_obj;
2201         const struct lu_fid        *pfid        = lfsck_dto2fid(dir);
2202         struct dt_object           *obj         = NULL;
2203         struct dt_device           *dev         = NULL;
2204         int                         shard_idx   = 0;
2205         int                         stripe      = 0;
2206         int                         rc          = 0;
2207         int                         depth       = 0;
2208         bool                        repaired    = false;
2209         enum lfsck_namespace_inconsistency_type type = LNIT_NONE;
2210         ENTRY;
2211
2212         if (unlikely(llmv->ll_ignore))
2213                 RETURN(0);
2214
2215         shard_idx = lfsck_find_mdt_idx_by_fid(env, lfsck, &lnr->lnr_fid);
2216         if (shard_idx < 0)
2217                 GOTO(fail_lmv, rc = shard_idx);
2218
2219         if (shard_idx == lfsck_dev_idx(lfsck->li_bottom)) {
2220                 if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0))
2221                         GOTO(out, rc = 0);
2222
2223                 dev = lfsck->li_next;
2224         } else {
2225                 struct lfsck_tgt_desc *ltd;
2226
2227                 /* Usually, some local filesystem consistency verification
2228                  * tools can guarantee the local namespace tree consistenct.
2229                  * So the LFSCK will only verify the remote directory. */
2230                 if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0)) {
2231                         rc = lfsck_namespace_trace_update(env, com, pfid,
2232                                                 LNTF_CHECK_PARENT, true);
2233
2234                         GOTO(out, rc);
2235                 }
2236
2237                 ltd = LTD_TGT(&lfsck->li_mdt_descs, shard_idx);
2238                 if (unlikely(ltd == NULL)) {
2239                         CDEBUG(D_LFSCK, "%s: cannot talk with MDT %x which "
2240                                "did not join the namespace LFSCK\n",
2241                                lfsck_lfsck2name(lfsck), shard_idx);
2242                         lfsck_lad_set_bitmap(env, com, shard_idx);
2243
2244                         GOTO(fail_lmv, rc = -ENODEV);
2245                 }
2246
2247                 dev = ltd->ltd_tgt;
2248         }
2249
2250         obj = lfsck_object_find_by_dev_nowait(env, dev, &lnr->lnr_fid);
2251         if (IS_ERR(obj)) {
2252                 if (lfsck_is_dead_obj(dir))
2253                         RETURN(0);
2254
2255                 GOTO(fail_lmv, rc = PTR_ERR(obj));
2256         }
2257
2258         if (!dt_object_exists(obj)) {
2259                 stripe = lfsck_shard_name_to_index(env, lnr->lnr_name,
2260                                 lnr->lnr_namelen, lnr->lnr_type, &lnr->lnr_fid);
2261                 if (stripe < 0) {
2262                         type = LNIT_BAD_DIRENT;
2263
2264                         GOTO(out, rc = 0);
2265                 }
2266
2267 dangling:
2268                 rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
2269                 if (rc == 0) {
2270                         memset(lmv, 0, sizeof(*lmv));
2271                         lmv->lmv_magic = LMV_MAGIC;
2272                         rc = lfsck_record_lmv(env, com, lnr, lmv, stripe,
2273                                               LSLF_DANGLING, LSLF_NONE, &depth);
2274                 }
2275
2276                 GOTO(out, rc);
2277         }
2278
2279         stripe = lfsck_shard_name_to_index(env, lnr->lnr_name, lnr->lnr_namelen,
2280                                            lfsck_object_type(obj),
2281                                            &lnr->lnr_fid);
2282         if (stripe < 0) {
2283                 type = LNIT_BAD_DIRENT;
2284
2285                 GOTO(out, rc = 0);
2286         }
2287
2288         rc = lfsck_read_stripe_lmv(env, obj, lmv);
2289         if (unlikely(rc == -ENOENT))
2290                 /* It may happen when the remote object has been removed,
2291                  * but the local MDT does not aware of that. */
2292                 goto dangling;
2293
2294         if (rc == -ENODATA)
2295                 rc = lfsck_record_lmv(env, com, lnr, lmv, stripe,
2296                                       LSLF_NO_LMVEA, LSLF_NONE, &depth);
2297         else if (rc == 0)
2298                 rc = lfsck_record_lmv(env, com, lnr, lmv, stripe,
2299                                       lmv->lmv_master_mdt_index != stripe ?
2300                                       LSLF_BAD_INDEX1 : LSLF_NONE, LSLF_NONE,
2301                                       &depth);
2302
2303         GOTO(out, rc);
2304
2305 fail_lmv:
2306         llmv->ll_failed = 1;
2307
2308 out:
2309         if (rc >= 0 && type == LNIT_NONE && !S_ISDIR(lnr->lnr_type))
2310                 type = LNIT_BAD_TYPE;
2311
2312         switch (type) {
2313         case LNIT_BAD_TYPE:
2314                 rc = lfsck_namespace_repair_dirent(env, com, dir, obj,
2315                                                    lnr->lnr_name, lnr->lnr_name,
2316                                                    lnr->lnr_type, true, false);
2317                 if (rc > 0)
2318                         repaired = true;
2319                 break;
2320         case LNIT_BAD_DIRENT:
2321                 rc = lfsck_namespace_repair_dirent(env, com, dir, obj,
2322                                                    lnr->lnr_name, lnr->lnr_name,
2323                                                    lnr->lnr_type, false, false);
2324                 if (rc > 0)
2325                         repaired = true;
2326                 break;
2327         default:
2328                 break;
2329         }
2330
2331         if (rc < 0) {
2332                 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle "
2333                        "the shard: "DFID", parent "DFID", name %.*s: rc = %d\n",
2334                        lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid),
2335                        PFID(lfsck_dto2fid(lnr->lnr_obj)),
2336                        lnr->lnr_namelen, lnr->lnr_name, rc);
2337
2338                 if ((rc == -ENOTCONN || rc == -ESHUTDOWN || rc == -EREMCHG ||
2339                      rc == -ETIMEDOUT || rc == -EHOSTDOWN ||
2340                      rc == -EHOSTUNREACH || rc == -EINPROGRESS) &&
2341                     dev != NULL && dev != lfsck->li_next)
2342                         lfsck_lad_set_bitmap(env, com, shard_idx);
2343
2344                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT))
2345                         rc = 0;
2346         } else {
2347                 if (repaired) {
2348                         ns->ln_items_repaired++;
2349
2350                         switch (type) {
2351                         case LNIT_BAD_TYPE:
2352                                 ns->ln_bad_type_repaired++;
2353                                 break;
2354                         case LNIT_BAD_DIRENT:
2355                                 ns->ln_dirent_repaired++;
2356                                 break;
2357                         default:
2358                                 break;
2359                         }
2360                 }
2361
2362                 rc = 0;
2363         }
2364
2365         if (obj != NULL && !IS_ERR(obj))
2366                 lfsck_object_put(env, obj);
2367
2368         return rc;
2369 }