Whamcloud - gitweb
LU-5519 lfsck: repair master LMV for striped directory
[fs/lustre-release.git] / lustre / lfsck / lfsck_striped_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_striped_dir.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 /*
32  * About the verification for striped directory. Some rules and assumptions:
33  *
34  * 1) lmv_magic: The magic may be wrong. But it is almost impossible (1/2^32
35  *    probability) that a master LMV EA claims as a slave LMV EA by wrong,
36  *    so we can ignore such race case and the reverse case.
37  *
38  * 2) lmv_master_mdt_index: The master index can be self-verified by compared
39  *    with the MDT index directly. The slave stripe index can be verified by
40  *    compared with the file name. Although both the name entry and the LMV EA
41  *    can be wrong, it is almost impossible that they hit the same bad data
42  *    So if they match each other, then trust them. Similarly, for the shard,
43  *    it stores index in both slave LMV EA and in linkEA, if the two copies
44  *    match, then trust them.
45  *
46  * 3) lmv_hash_type: The valid hash type should be LMV_HASH_TYPE_ALL_CHARS or
47  *    LMV_HASH_TYPE_FNV_1A_64. If the LFSCK instance on some slave finds that
48  *    the name hash against the hash function does not match the MDT, then it
49  *    will change the master LMV EA hash type as LMV_HASH_TYPE_UNKNOWN. With
50  *    such hash type, the whole striped directory still can be accessed via
51  *    lookup/readdir, and also support unlink, but cannot add new name entry.
52  *
53  * 3.1) If the master hash type is one of the valid values, then trust the
54  *      master LMV EA. Because:
55  *
56  * 3.1.1) The master hash type is visible to the client and used by the client.
57  *
58  * 3.1.2) For a given name, different hash types may map the name entry to the
59  *        same MDT. So simply checking one name entry or some name entries may
60  *        cannot verify whether the hash type is correct or not.
61  *
62  * 3.1.3) Different shards can claim different hash types, it is not easy to
63  *        distinguish which ones are correct. Even though the master is wrong,
64  *        as the LFSCK processing, some LFSCK instance on other MDT may finds
65  *        unmatched name hash, then it will change the master hash type to
66  *        LMV_HASH_TYPE_UNKNOWN as described above. The worst case is euqal
67  *        to the case without the LFSCK.
68  *
69  * 3.2) If the master hash type is invalid, nor LMV_HASH_TYPE_UNKNOWN, then
70  *      trust the first shard with valid hash type (ALL_CHARS or FNV_1A_64).
71  *      If the shard is also worng, means there are double failures, then as
72  *      the LFSCK processing, other LFSCK instances on the other MDTs may
73  *      find unmatched name hash, and then, the master hash type will be
74  *      changed to LMV_HASH_TYPE_UNKNOWN as described in the 3).
75  *
76  * 3.3) If the master hash type is LMV_HASH_TYPE_UNKNOWN, then it is possible
77  *      that some other LFSCK instance on other MDT found bad name hash, then
78  *      changed the master hash type to LMV_HASH_TYPE_UNKNOWN as described in
79  *      the 3). But it also maybe because of data corruption in master LMV EA.
80  *      To make such two cases to be distinguishable, when the LFSCK changes
81  *      the master hash type to LMV_HASH_TYPE_UNKNOWN, it will mark in the
82  *      master LMV EA (new lmv flags LMV_HASH_FLAG_BAD_TYPE). Then subsequent
83  *      LFSCK checking can distinguish them: for former case, turst the master
84  *      LMV EA with nothing to be done; otherwise, trust the first shard with
85  *      valid hash type (ALL_CHARS or FNV_1A_64) as the 3.2) does.
86  *
87  * 4) lmv_stripe_count: For a shard of a striped directory, if its index has
88  *    been verified as the 2), then the stripe count must be larger than its
89  *    index. For the master object, by scanning each shard's index, the LFSCK
90  *    can know the highest index, and the stripe count must be larger than the
91  *    known highest index. If the stipe count in the LMV EA matches above two
92  *    rules, then it is may be trustable. If both the master claimed stripe
93  *    count and the slave claimed stripe count match each own rule, but they
94  *    are not the same, then trust the master. Because the stripe count in
95  *    the master LMV EA is visible to client and used to distribute the name
96  *    entry to some shard, but the slave LMV EA is only used for verification
97  *    and invisible to client.
98  *
99  * 5) If the master LMV EA is lost, then there are two possible cases:
100  *
101  * 5.1) The slave claims slave LMV EA by wrong, means that the parent was not
102  *      a striped directory, but its sub-directory has a wrong slave LMV EA.
103  *      It is very very race case, similar as the 1), can be ignored.
104  *
105  * 5.2) The parent directory is a striped directory, but the master LMV EA
106  *      is lost or crashed. Then the LFSCK needs to re-generate the master
107  *      LMV EA: the lmv_master_mdt_index is from the MDT device index; the
108  *      lmv_hash_type is from the first valid shard; the lmv_stripe_count
109  *      will be calculated via scanning all the shards.
110  *
111  * 5.2.1) Before re-generating the master LMV EA, the LFSCK needs to check
112  *        whether someone has created some file(s) under the master object
113  *        after the master LMV EA disappear. If yes, the LFSCK will cannot
114  *        re-generate the master LMV EA, otherwise, such new created files
115  *        will be invisible to client. Under such case, the LFSCK will mark
116  *        the master object as read only (without master LMV EA). Then all
117  *        things under the master MDT-object, including those new created
118  *        files and the shards themselves, will be visibile to client. And
119  *        then the administrator can handle the bad striped directory with
120  *        more human knowledge.
121  *
122  * 5.2.2) If someone created some special sub-directory under the master
123  *        MDT-object with the same naming rule as shard name $FID:$index,
124  *        as to the LFSCK cannot detect it before re-generating the master
125  *        LMV EA, then such sub-directory itself will be invisible after
126  *        the LFSCK re-generating the master LMV EA. The sub-items under
127  *        such sub-directory are still visible to client. As the LFSCK
128  *        processing, if such sub-directory cause some conflict with other
129  *        normal shard, such as the index conflict, then the LFSCK will
130  *        remove the master LMV EA and change the master MDT-object to
131  *        read-only mode as the 5.2.1). But if there is no conflict, the
132  *        LFSCK will regard such sub-directory as a striped shard that
133  *        lost its slave LMV EA, and will re-generate slave LMV EA for it.
134  *
135  * 5.2.3) Anytime, if the LFSCK found some shards name/index conflict,
136  *        and cannot make the distinguish which one is right, then it
137  *        will remove the master LMV EA and change the MDT-object to
138  *        read-only mode as the 5.2.2).
139  */
140
141 #define DEBUG_SUBSYSTEM S_LFSCK
142
143 #include <lustre/lustre_idl.h>
144 #include <lu_object.h>
145 #include <dt_object.h>
146 #include <md_object.h>
147 #include <lustre_fid.h>
148 #include <lustre_lib.h>
149 #include <lustre_net.h>
150 #include <lustre_lmv.h>
151 #include <lustre/lustre_user.h>
152
153 #include "lfsck_internal.h"
154
155 void lfsck_lmv_put(const struct lu_env *env, struct lfsck_lmv *llmv)
156 {
157         if (llmv != NULL && atomic_dec_and_test(&llmv->ll_ref)) {
158                 if (llmv->ll_inline) {
159                         struct lfsck_lmv_unit   *llu;
160                         struct lfsck_instance   *lfsck;
161
162                         llu = list_entry(llmv, struct lfsck_lmv_unit, llu_lmv);
163                         lfsck = llu->llu_lfsck;
164
165                         spin_lock(&lfsck->li_lock);
166                         list_del(&llu->llu_link);
167                         spin_unlock(&lfsck->li_lock);
168
169                         lfsck_object_put(env, llu->llu_obj);
170
171                         LASSERT(llmv->ll_lslr != NULL);
172
173                         OBD_FREE_LARGE(llmv->ll_lslr,
174                                        sizeof(*llmv->ll_lslr) *
175                                        llmv->ll_stripes_allocated);
176                         OBD_FREE_PTR(llu);
177                 } else {
178                         if (llmv->ll_lslr != NULL)
179                                 OBD_FREE_LARGE(llmv->ll_lslr,
180                                         sizeof(*llmv->ll_lslr) *
181                                         llmv->ll_stripes_allocated);
182
183                         OBD_FREE_PTR(llmv);
184                 }
185         }
186 }
187
188 /**
189  * Mark the specified directory as read-only by set LUSTRE_IMMUTABLE_FL.
190  *
191  * The caller has taken the ldlm lock on the @obj already.
192  *
193  * \param[in] env       pointer to the thread context
194  * \param[in] com       pointer to the lfsck component
195  * \param[in] obj       pointer to the object to be handled
196  * \param[in] del_lmv   true if need to drop the LMV EA
197  *
198  * \retval              positive number if nothing to be done
199  * \retval              zero for succeed
200  * \retval              negative error number on failure
201  */
202 static int lfsck_disable_master_lmv(const struct lu_env *env,
203                                     struct lfsck_component *com,
204                                     struct dt_object *obj, bool del_lmv)
205 {
206         struct lfsck_thread_info        *info   = lfsck_env_info(env);
207         struct lu_attr                  *la     = &info->lti_la;
208         struct lfsck_instance           *lfsck  = com->lc_lfsck;
209         struct dt_device                *dev    = lfsck_obj2dt_dev(obj);
210         struct thandle                  *th     = NULL;
211         int                              rc     = 0;
212         ENTRY;
213
214         th = dt_trans_create(env, dev);
215         if (IS_ERR(th))
216                 GOTO(log, rc = PTR_ERR(th));
217
218         if (del_lmv) {
219                 rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LMV, th);
220                 if (rc != 0)
221                         GOTO(stop, rc);
222         }
223
224         la->la_valid = LA_FLAGS;
225         rc = dt_declare_attr_set(env, obj, la, th);
226         if (rc != 0)
227                 GOTO(stop, rc);
228
229         rc = dt_trans_start_local(env, dev, th);
230         if (rc != 0)
231                 GOTO(stop, rc);
232
233         dt_write_lock(env, obj, 0);
234         if (unlikely(lfsck_is_dead_obj(obj)))
235                 GOTO(unlock, rc = 1);
236
237         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
238                 GOTO(unlock, rc = 0);
239
240         if (del_lmv) {
241                 rc = dt_xattr_del(env, obj, XATTR_NAME_LMV, th, BYPASS_CAPA);
242                 if (rc != 0)
243                         GOTO(unlock, rc);
244         }
245
246         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
247         if (rc == 0 && !(la->la_flags & LUSTRE_IMMUTABLE_FL)) {
248                 la->la_valid = LA_FLAGS;
249                 la->la_flags |= LUSTRE_IMMUTABLE_FL;
250                 rc = dt_attr_set(env, obj, la, th, BYPASS_CAPA);
251         }
252
253         GOTO(unlock, rc);
254
255 unlock:
256         dt_write_unlock(env, obj);
257
258 stop:
259         dt_trans_stop(env, dev, th);
260
261 log:
262         CDEBUG(D_LFSCK, "%s: namespace LFSCK set the master MDT-object of "
263                "the striped directory "DFID" as read-only: rc = %d\n",
264                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
265
266         if (rc <= 0) {
267                 struct lfsck_namespace *ns = com->lc_file_ram;
268
269                 ns->ln_flags |= LF_INCONSISTENT;
270                 if (rc == 0)
271                         ns->ln_striped_dirs_disabled++;
272         }
273
274         return rc;
275 }
276
277 static inline bool lfsck_is_valid_slave_lmv(struct lmv_mds_md_v1 *lmv)
278 {
279         return lmv->lmv_stripe_count >= 1 &&
280                lmv->lmv_stripe_count <= LFSCK_LMV_MAX_STRIPES &&
281                lmv->lmv_stripe_count > lmv->lmv_master_mdt_index &&
282                lmv_is_known_hash_type(lmv->lmv_hash_type);
283 }
284
285 int lfsck_read_stripe_lmv(const struct lu_env *env, struct dt_object *obj,
286                           struct lmv_mds_md_v1 *lmv)
287 {
288         struct dt_object *bottom;
289         int               rc;
290
291         /* Currently, we only store the LMV header on disk. It is the LOD's
292          * duty to iterate the master MDT-object's directory to compose the
293          * integrated LMV EA. But here, we only want to load the LMV header,
294          * so we need to bypass LOD to avoid unnecessary iteration in LOD. */
295         bottom = lu2dt(container_of0(obj->do_lu.lo_header->loh_layers.prev,
296                                      struct lu_object, lo_linkage));
297         if (unlikely(bottom == NULL))
298                 return -ENOENT;
299
300         dt_read_lock(env, bottom, 0);
301         rc = dt_xattr_get(env, bottom, lfsck_buf_get(env, lmv, sizeof(*lmv)),
302                           XATTR_NAME_LMV, BYPASS_CAPA);
303         dt_read_unlock(env, bottom);
304         if (rc != sizeof(*lmv))
305                 return rc > 0 ? -EINVAL : rc;
306
307         lfsck_lmv_header_le_to_cpu(lmv, lmv);
308         if ((lmv->lmv_magic == LMV_MAGIC &&
309              !(lmv->lmv_hash_type & LMV_HASH_FLAG_MIGRATION)) ||
310             (lmv->lmv_magic == LMV_MAGIC_STRIPE &&
311              !(lmv->lmv_hash_type & LMV_HASH_FLAG_DEAD)))
312                 return 0;
313
314         return -ENODATA;
315 }
316
317 /**
318  * Parse the shard's index from the given shard name.
319  *
320  * The valid shard name/type should be:
321  * 1) The type must be S_IFDIR
322  * 2) The name should be $FID:$index
323  * 3) the index should within valid range.
324  *
325  * \param[in] env       pointer to the thread context
326  * \param[in] name      the shard name
327  * \param[in] namelen   the name length
328  * \param[in] type      the entry's type
329  * \param[in] fid       the entry's FID
330  *
331  * \retval              zero or positive number for the index from the name
332  * \retval              negative error number on failure
333  */
334 int lfsck_shard_name_to_index(const struct lu_env *env, const char *name,
335                               int namelen, __u16 type, const struct lu_fid *fid)
336 {
337         char    *name2  = lfsck_env_info(env)->lti_tmpbuf2;
338         int      len;
339         int      idx    = 0;
340
341         if (!S_ISDIR(type))
342                 return -ENOTDIR;
343
344         LASSERT(name != name2);
345
346         len = snprintf(name2, sizeof(lfsck_env_info(env)->lti_tmpbuf2),
347                        DFID":", PFID(fid));
348         if (namelen < len + 1 || memcmp(name, name2, len) != 0)
349                 return -EINVAL;
350
351         do {
352                 if (!isdigit(name[len]))
353                         return -EINVAL;
354
355                 idx = idx * 10 + name[len++] - '0';
356         } while (len < namelen);
357
358         if (idx >= LFSCK_LMV_MAX_STRIPES)
359                 return -EINVAL;
360
361         return idx;
362 }
363
364 bool lfsck_is_valid_slave_name_entry(const struct lu_env *env,
365                                      struct lfsck_lmv *llmv,
366                                      const char *name, int namelen)
367 {
368         struct lmv_mds_md_v1    *lmv;
369         int                      idx;
370
371         if (llmv == NULL || !llmv->ll_lmv_slave || !llmv->ll_lmv_verified)
372                 return true;
373
374         lmv = &llmv->ll_lmv;
375         idx = lmv_name_to_stripe_index(lmv->lmv_hash_type,
376                                        lmv->lmv_stripe_count,
377                                        name, namelen);
378         if (unlikely(idx != lmv->lmv_master_mdt_index))
379                 return false;
380
381         return true;
382 }
383
384 /**
385  * Check whether the given name is a valid entry under the @parent.
386  *
387  * If the @parent is a striped directory then the @child should one
388  * shard of the striped directory, its name should be $FID:$index.
389  *
390  * If the @parent is a shard of a striped directory, then the name hash
391  * should match the MDT, otherwise it is invalid.
392  *
393  * \param[in] env       pointer to the thread context
394  * \param[in] parent    the parent directory
395  * \param[in] child     the child object to be checked
396  * \param[in] cname     the name for the @child in the parent directory
397  *
398  * \retval              positive number for invalid name entry
399  * \retval              0 if the name is valid or uncertain
400  * \retval              negative error number on failure
401  */
402 int lfsck_namespace_check_name(const struct lu_env *env,
403                                struct dt_object *parent,
404                                struct dt_object *child,
405                                const struct lu_name *cname)
406 {
407         struct lmv_mds_md_v1    *lmv = &lfsck_env_info(env)->lti_lmv;
408         int                      idx;
409         int                      rc;
410
411         rc = lfsck_read_stripe_lmv(env, parent, lmv);
412         if (rc != 0)
413                 RETURN(rc == -ENODATA ? 0 : rc);
414
415         if (lmv->lmv_magic == LMV_MAGIC_STRIPE) {
416                 if (!lfsck_is_valid_slave_lmv(lmv))
417                         return 0;
418
419                 idx = lmv_name_to_stripe_index(lmv->lmv_hash_type,
420                                                lmv->lmv_stripe_count,
421                                                cname->ln_name,
422                                                cname->ln_namelen);
423                 if (unlikely(idx != lmv->lmv_master_mdt_index))
424                         return 1;
425         } else if (lfsck_shard_name_to_index(env, cname->ln_name,
426                         cname->ln_namelen, lfsck_object_type(child),
427                         lfsck_dto2fid(child)) < 0) {
428                 return 1;
429         }
430
431         return 0;
432 }
433
434 /**
435  * Update the object's LMV EA with the given @lmv.
436  *
437  * \param[in] env       pointer to the thread context
438  * \param[in] com       pointer to the lfsck component
439  * \param[in] obj       pointer to the object which LMV EA will be updated
440  * \param[in] lmv       pointer to buffer holding the new LMV EA
441  * \param[in] locked    whether the caller has held ldlm lock on the @obj or not
442  *
443  * \retval              positive number for nothing to be done
444  * \retval              zero if updated successfully
445  * \retval              negative error number on failure
446  */
447 int lfsck_namespace_update_lmv(const struct lu_env *env,
448                                struct lfsck_component *com,
449                                struct dt_object *obj,
450                                struct lmv_mds_md_v1 *lmv, bool locked)
451 {
452         struct lfsck_thread_info        *info   = lfsck_env_info(env);
453         struct lmv_mds_md_v1            *lmv4   = &info->lti_lmv4;
454         struct lu_buf                   *buf    = &info->lti_buf;
455         struct lfsck_instance           *lfsck  = com->lc_lfsck;
456         struct dt_device                *dev    = lfsck_obj2dt_dev(obj);
457         struct thandle                  *th     = NULL;
458         struct lustre_handle             lh     = { 0 };
459         int                              rc     = 0;
460         int                              rc1    = 0;
461         ENTRY;
462
463         LASSERT(lmv4 != lmv);
464
465         lfsck_lmv_header_cpu_to_le(lmv4, lmv);
466         lfsck_buf_init(buf, lmv4, sizeof(*lmv4));
467
468         if (!locked) {
469                 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
470                                       MDS_INODELOCK_UPDATE |
471                                       MDS_INODELOCK_XATTR, LCK_EX);
472                 if (rc != 0)
473                         GOTO(log, rc);
474         }
475
476         th = dt_trans_create(env, dev);
477         if (IS_ERR(th))
478                 GOTO(log, rc = PTR_ERR(th));
479
480         /* For remote updating LMV EA, there will be further LFSCK action on
481          * remote MDT after the updating, so update the LMV EA synchronously. */
482         if (dt_object_remote(obj))
483                 th->th_sync = 1;
484
485         rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LMV, 0, th);
486         if (rc != 0)
487                 GOTO(stop, rc);
488
489         rc = dt_trans_start_local(env, dev, th);
490         if (rc != 0)
491                 GOTO(stop, rc);
492
493         dt_write_lock(env, obj, 0);
494         if (unlikely(lfsck_is_dead_obj(obj)))
495                 GOTO(unlock, rc = 1);
496
497         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
498                 GOTO(unlock, rc = 0);
499
500         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LMV, 0, th, BYPASS_CAPA);
501
502         GOTO(unlock, rc);
503
504 unlock:
505         dt_write_unlock(env, obj);
506
507 stop:
508         rc1 = dt_trans_stop(env, dev, th);
509         if (rc == 0)
510                 rc = rc1;
511
512 log:
513         lfsck_ibits_unlock(&lh, LCK_EX);
514         CDEBUG(D_LFSCK, "%s: namespace LFSCK updated the %s LMV EA "
515                "for the object "DFID": rc = %d\n",
516                lfsck_lfsck2name(lfsck),
517                lmv->lmv_magic == LMV_MAGIC ? "master" : "slave",
518                PFID(lfsck_dto2fid(obj)), rc);
519
520         return rc;
521 }
522
523 /**
524  * Check whether there are non-shard objects under the striped directory.
525  *
526  * If the master MDT-object of the striped directory lost its master LMV EA,
527  * then before the LFSCK repaired the striped directory, some ones may have
528  * created some non-shard objects under the master MDT-object. If such case
529  * happend, then the LFSCK cannot re-generate the lost master LMV EA to keep
530  * those non-shard objects to be visible to client.
531  *
532  * \param[in] env       pointer to the thread context
533  * \param[in] com       pointer to the lfsck component
534  * \param[in] obj       pointer to the master MDT-object to be checked
535  * \param[in] cfid      the shard's FID used for verification
536  * \param[in] cidx      the shard's index used for verification
537  *
538  * \retval              positive number if not allow to re-generate LMV EA
539  * \retval              zero if allow to re-generate LMV EA
540  * \retval              negative error number on failure
541  */
542 static int lfsck_allow_set_master_lmv(const struct lu_env *env,
543                                       struct lfsck_component *com,
544                                       struct dt_object *obj,
545                                       const struct lu_fid *cfid, __u32 cidx)
546 {
547         struct lfsck_thread_info        *info   = lfsck_env_info(env);
548         struct lu_fid                   *tfid   = &info->lti_fid3;
549         struct lfsck_instance           *lfsck  = com->lc_lfsck;
550         struct lu_dirent                *ent    =
551                         (struct lu_dirent *)info->lti_key;
552         const struct dt_it_ops          *iops;
553         struct dt_it                    *di;
554         __u64                            cookie;
555         __u32                            args;
556         int                              rc;
557         __u16                            type;
558         ENTRY;
559
560         if (unlikely(!dt_try_as_dir(env, obj)))
561                 RETURN(-ENOTDIR);
562
563         /* Check whether the shard and the master MDT-object matches or not. */
564         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf), DFID":%u",
565                  PFID(cfid), cidx);
566         rc = dt_lookup(env, obj, (struct dt_rec *)tfid,
567                        (const struct dt_key *)info->lti_tmpbuf, BYPASS_CAPA);
568         if (rc != 0)
569                 RETURN(rc);
570
571         if (!lu_fid_eq(tfid, cfid))
572                 RETURN(-ENOENT);
573
574         args = lfsck->li_args_dir & ~(LUDA_VERIFY | LUDA_VERIFY_DRYRUN);
575         iops = &obj->do_index_ops->dio_it;
576         di = iops->init(env, obj, args, BYPASS_CAPA);
577         if (IS_ERR(di))
578                 RETURN(PTR_ERR(di));
579
580         rc = iops->load(env, di, 0);
581         if (rc == 0)
582                 rc = iops->next(env, di);
583         else if (rc > 0)
584                 rc = 0;
585
586         if (rc != 0)
587                 GOTO(out, rc);
588
589         do {
590                 rc = iops->rec(env, di, (struct dt_rec *)ent, args);
591                 if (rc == 0)
592                         rc = lfsck_unpack_ent(ent, &cookie, &type);
593
594                 if (rc != 0)
595                         GOTO(out, rc);
596
597                 /* skip dot and dotdot entries */
598                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
599                         goto next;
600
601                 /* If the subdir name does not match the shard name rule, then
602                  * it is quite possible that it is NOT a shard, but created by
603                  * someone after the master MDT-object lost the master LMV EA.
604                  * But it is also possible that the subdir name entry crashed,
605                  * under such double failure cases, the LFSCK cannot know how
606                  * to repair the inconsistency. For data safe, the LFSCK will
607                  * mark the master MDT-object as read-only. The administrator
608                  * can fix the bad shard name manually, then run LFSCK again.
609                  *
610                  * XXX: If the subdir name matches the shard name rule, but it
611                  *      is not a real shard of the striped directory, instead,
612                  *      it was created by someone after the master MDT-object
613                  *      lost the LMV EA, then re-generating the master LMV EA
614                  *      will cause such subdir to be invisible to client, and
615                  *      if its index occupies some lost shard index, then the
616                  *      LFSCK will use it to replace the bad shard, and cause
617                  *      the subdir (itself) to be invisible for ever. */
618                 if (lfsck_shard_name_to_index(env, ent->lde_name,
619                                 ent->lde_namelen, type, &ent->lde_fid) < 0)
620                         GOTO(out, rc = 1);
621
622 next:
623                 rc = iops->next(env, di);
624         } while (rc == 0);
625
626         GOTO(out, rc = 0);
627
628 out:
629         iops->put(env, di);
630         iops->fini(env, di);
631
632         return rc;
633 }
634
635 /**
636  * Notify remote LFSCK instance that the object's LMV EA has been updated.
637  *
638  * \param[in] env       pointer to the thread context
639  * \param[in] com       pointer to the lfsck component
640  * \param[in] obj       pointer to the object on which the LMV EA will be set
641  * \param[in] event     indicate either master or slave LMV EA has been updated
642  * \param[in] flags     indicate which element(s) in the LMV EA has been updated
643  * \param[in] index     the MDT index on which the LFSCK instance to be notified
644  *
645  * \retval              positive number if nothing to be done
646  * \retval              zero for succeed
647  * \retval              negative error number on failure
648  */
649 static int lfsck_namespace_notify_lmv_remote(const struct lu_env *env,
650                                              struct lfsck_component *com,
651                                              struct dt_object *obj,
652                                              __u32 event, __u32 flags,
653                                              __u32 index)
654 {
655         struct lfsck_request            *lr     = &lfsck_env_info(env)->lti_lr;
656         const struct lu_fid             *fid    = lfsck_dto2fid(obj);
657         struct lfsck_instance           *lfsck  = com->lc_lfsck;
658         struct lfsck_tgt_desc           *ltd    = NULL;
659         struct ptlrpc_request           *req    = NULL;
660         int                              rc;
661         ENTRY;
662
663         ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, index);
664         if (ltd == NULL)
665                 GOTO(out, rc = -ENODEV);
666
667         req = ptlrpc_request_alloc(class_exp2cliimp(ltd->ltd_exp),
668                                    &RQF_LFSCK_NOTIFY);
669         if (req == NULL)
670                 GOTO(out, rc = -ENOMEM);
671
672         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
673         if (rc != 0) {
674                 ptlrpc_request_free(req);
675
676                 GOTO(out, rc);
677         }
678
679         lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
680         memset(lr, 0, sizeof(*lr));
681         lr->lr_event = event;
682         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
683         lr->lr_active = LFSCK_TYPE_NAMESPACE;
684         lr->lr_fid = *fid;
685         lr->lr_flags = flags;
686
687         ptlrpc_request_set_replen(req);
688         rc = ptlrpc_queue_wait(req);
689         ptlrpc_req_finished(req);
690
691         GOTO(out, rc = (rc == -ENOENT ? 1 : rc));
692
693 out:
694         CDEBUG(D_LFSCK, "%s: namespace LFSCK notify LMV EA updated for the "
695                "object "DFID" on MDT %x remotely with event %u, flags %u: "
696                "rc = %d\n", lfsck_lfsck2name(lfsck), PFID(fid), index,
697                event, flags, rc);
698
699         if (ltd != NULL)
700                 lfsck_tgt_put(ltd);
701
702         return rc;
703 }
704
705 /**
706  * Generate request for local LFSCK instance to rescan the striped directory.
707  *
708  * \param[in] env       pointer to the thread context
709  * \param[in] com       pointer to the lfsck component
710  * \param[in] obj       pointer to the striped directory to be rescanned
711  *
712  * \retval              positive number if nothing to be done
713  * \retval              zero for succeed
714  * \retval              negative error number on failure
715  */
716 int lfsck_namespace_notify_lmv_master_local(const struct lu_env *env,
717                                             struct lfsck_component *com,
718                                             struct dt_object *obj)
719 {
720         struct lfsck_instance      *lfsck = com->lc_lfsck;
721         struct lfsck_namespace     *ns    = com->lc_file_ram;
722         struct lmv_mds_md_v1       *lmv4  = &lfsck_env_info(env)->lti_lmv4;
723         struct lfsck_lmv_unit      *llu;
724         struct lfsck_lmv           *llmv;
725         struct lfsck_slave_lmv_rec *lslr;
726         int                         count = 0;
727         int                         rc;
728         ENTRY;
729
730         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
731                 RETURN(0);
732
733         rc = lfsck_read_stripe_lmv(env, obj, lmv4);
734         if (rc != 0)
735                 RETURN(rc);
736
737         OBD_ALLOC_PTR(llu);
738         if (unlikely(llu == NULL))
739                 RETURN(-ENOMEM);
740
741         if (lmv4->lmv_stripe_count < 1)
742                 count = LFSCK_LMV_DEF_STRIPES;
743         else if (lmv4->lmv_stripe_count > LFSCK_LMV_MAX_STRIPES)
744                 count = LFSCK_LMV_MAX_STRIPES;
745         else
746                 count = lmv4->lmv_stripe_count;
747
748         OBD_ALLOC_LARGE(lslr, sizeof(struct lfsck_slave_lmv_rec) * count);
749         if (lslr == NULL) {
750                 OBD_FREE_PTR(llu);
751
752                 RETURN(-ENOMEM);
753         }
754
755         INIT_LIST_HEAD(&llu->llu_link);
756         llu->llu_lfsck = lfsck;
757         llu->llu_obj = lfsck_object_get(obj);
758         llmv = &llu->llu_lmv;
759         llmv->ll_lmv_master = 1;
760         llmv->ll_inline = 1;
761         atomic_set(&llmv->ll_ref, 1);
762         llmv->ll_stripes_allocated = count;
763         llmv->ll_hash_type = LMV_HASH_TYPE_UNKNOWN;
764         llmv->ll_lslr = lslr;
765         llmv->ll_lmv = *lmv4;
766
767         down_write(&com->lc_sem);
768         if (ns->ln_status != LS_SCANNING_PHASE1 &&
769             ns->ln_status != LS_SCANNING_PHASE2) {
770                 ns->ln_striped_dirs_skipped++;
771                 up_write(&com->lc_sem);
772                 lfsck_lmv_put(env, llmv);
773         } else {
774                 ns->ln_striped_dirs_repaired++;
775                 spin_lock(&lfsck->li_lock);
776                 list_add_tail(&llu->llu_link, &lfsck->li_list_lmv);
777                 spin_unlock(&lfsck->li_lock);
778                 up_write(&com->lc_sem);
779         }
780
781         RETURN(0);
782 }
783
784 /**
785  * Set master LMV EA for the specified striped directory.
786  *
787  * First, if the master MDT-object of a striped directory lost its LMV EA,
788  * then there may be some users have created some files under the master
789  * MDT-object directly. Under such case, the LFSCK cannot re-generate LMV
790  * EA for the master MDT-object, because we should keep the existing files
791  * to be visible to client. Then the LFSCK will mark the striped directory
792  * as read-only and keep it there to be handled by administrator manually.
793  *
794  * If nobody has created files under the master MDT-object of the striped
795  * directory, then we will set the master LMV EA and generate a new rescan
796  * (the striped directory) request that will be handled later by the LFSCK
797  * instance on the MDT later.
798  *
799  * \param[in] env       pointer to the thread context
800  * \param[in] com       pointer to the lfsck component
801  * \param[in] dir       pointer to the object on which the LMV EA will be set
802  * \param[in] lmv       pointer to the buffer holding the new LMV EA
803  * \param[in] cfid      the shard's FID used for verification
804  * \param[in] cidx      the shard's index used for verification
805  * \param[in] flags     to indicate which element(s) in the LMV EA will be set
806  *
807  * \retval              positive number if nothing to be done
808  * \retval              zero for succeed
809  * \retval              negative error number on failure
810  */
811 static int lfsck_namespace_set_lmv_master(const struct lu_env *env,
812                                           struct lfsck_component *com,
813                                           struct dt_object *dir,
814                                           struct lmv_mds_md_v1 *lmv,
815                                           const struct lu_fid *cfid,
816                                           __u32 cidx, __u32 flags)
817 {
818         struct lfsck_thread_info        *info   = lfsck_env_info(env);
819         struct lmv_mds_md_v1            *lmv3   = &info->lti_lmv3;
820         struct lfsck_instance           *lfsck  = com->lc_lfsck;
821         struct dt_object                *obj;
822         struct lustre_handle             lh     = { 0 };
823         int                              pidx   = -1;
824         int                              rc     = 0;
825         ENTRY;
826
827         /* Find the bottom object to bypass LOD when set LMV EA. */
828         obj = lu2dt(container_of0(dir->do_lu.lo_header->loh_layers.prev,
829                                   struct lu_object, lo_linkage));
830         if (unlikely(obj == NULL))
831                 RETURN(-ENOENT);
832
833         if (dt_object_remote(obj)) {
834                 struct lu_seq_range     *range  = &info->lti_range;
835                 struct seq_server_site  *ss     =
836                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
837
838                 fld_range_set_mdt(range);
839                 rc = fld_server_lookup(env, ss->ss_server_fld,
840                                        fid_seq(lfsck_dto2fid(obj)), range);
841                 if (rc != 0)
842                         GOTO(log, rc);
843
844                 pidx = range->lsr_index;
845         } else {
846                 pidx = lfsck_dev_idx(lfsck->li_bottom);
847         }
848
849         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
850                               MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
851                               LCK_EX);
852         if (rc != 0)
853                 GOTO(log, rc);
854
855         rc = lfsck_read_stripe_lmv(env, obj, lmv3);
856         if (rc == -ENODATA) {
857                 if (!(flags & LEF_SET_LMV_ALL))
858                         GOTO(log, rc);
859
860                 *lmv3 = *lmv;
861         } else if (rc == 0) {
862                 if (flags & LEF_SET_LMV_ALL)
863                         GOTO(log, rc = 1);
864
865                 if (flags & LEF_SET_LMV_HASH)
866                         lmv3->lmv_hash_type = lmv->lmv_hash_type;
867         } else {
868                 GOTO(log, rc);
869         }
870
871         lmv3->lmv_magic = LMV_MAGIC;
872         lmv3->lmv_master_mdt_index = pidx;
873
874         if (flags & LEF_SET_LMV_ALL) {
875                 rc = lfsck_allow_set_master_lmv(env, com, obj, cfid, cidx);
876                 if (rc > 0) {
877                         rc = lfsck_disable_master_lmv(env, com, obj, false);
878
879                         GOTO(log, rc = (rc == 0 ? 1 : rc));
880                 }
881
882                 if (rc < 0)
883                         GOTO(log, rc);
884
885                 /* To indicate that the master has ever lost LMV EA. */
886                 lmv3->lmv_hash_type |= LMV_HASH_FLAG_LOST_LMV;
887         }
888
889         rc = lfsck_namespace_update_lmv(env, com, obj, lmv3, true);
890         if (rc == 0 && flags & LEF_SET_LMV_ALL) {
891                 if (dt_object_remote(obj))
892                         rc = lfsck_namespace_notify_lmv_remote(env, com, obj,
893                                                 LE_SET_LMV_MASTER, 0, pidx);
894                 else
895                         rc = lfsck_namespace_notify_lmv_master_local(env, com,
896                                                                      obj);
897         }
898
899         GOTO(log, rc);
900
901 log:
902         lfsck_ibits_unlock(&lh, LCK_EX);
903         CDEBUG(D_LFSCK, "%s: namespace LFSCK set master LMV EA for the object "
904                DFID" on the %s MDT %d, flags %x: rc = %d\n",
905                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)),
906                dt_object_remote(obj) ? "remote" : "local", pidx, flags, rc);
907
908         if (rc <= 0) {
909                 struct lfsck_namespace *ns = com->lc_file_ram;
910
911                 ns->ln_flags |= LF_INCONSISTENT;
912         }
913
914         return rc;
915 }
916
917 /**
918  * Repair the bad name hash.
919  *
920  * If the name hash of some name entry under the striped directory does not
921  * match the shard of the striped directory, then the LFSCK will repair the
922  * inconsistency. Ideally, the LFSCK should migrate the name entry from the
923  * current MDT to the right MDT (another one), but before the async commit
924  * finished, the LFSCK will change the striped directory's hash type as
925  * LMV_HASH_TYPE_UNKNOWN and mark the lmv flags as LMV_HASH_FLAG_BAD_TYPE.
926  *
927  * \param[in] env       pointer to the thread context
928  * \param[in] com       pointer to the lfsck component
929  * \param[in] shard     pointer to the shard of the striped directory that
930  *                      contains the bad name entry
931  * \param[in] llmv      pointer to lfsck LMV EA structure
932  * \param[in] name      the name of the bad name hash
933  *
934  * \retval              positive number if nothing to be done
935  * \retval              zero for succeed
936  * \retval              negative error number on failure
937  */
938 int lfsck_namespace_repair_bad_name_hash(const struct lu_env *env,
939                                          struct lfsck_component *com,
940                                          struct dt_object *shard,
941                                          struct lfsck_lmv *llmv,
942                                          const char *name)
943 {
944         struct lfsck_thread_info        *info   = lfsck_env_info(env);
945         struct lu_fid                   *pfid   = &info->lti_fid3;
946         struct lmv_mds_md_v1            *lmv2   = &info->lti_lmv2;
947         struct lfsck_instance           *lfsck  = com->lc_lfsck;
948         struct dt_object                *parent = NULL;
949         int                              rc     = 0;
950         ENTRY;
951
952         rc = dt_lookup(env, shard, (struct dt_rec *)pfid,
953                        (const struct dt_key *)dotdot, BYPASS_CAPA);
954         if (rc != 0 || !fid_is_sane(pfid))
955                 GOTO(log, rc);
956
957         parent = lfsck_object_find_bottom(env, lfsck, pfid);
958         if (IS_ERR(parent))
959                 GOTO(log, rc = PTR_ERR(parent));
960
961         *lmv2 = llmv->ll_lmv;
962         lmv2->lmv_hash_type = LMV_HASH_TYPE_UNKNOWN | LMV_HASH_FLAG_BAD_TYPE;
963         rc = lfsck_namespace_set_lmv_master(env, com, parent, lmv2,
964                                             lfsck_dto2fid(shard),
965                                             llmv->ll_lmv.lmv_master_mdt_index,
966                                             LEF_SET_LMV_HASH);
967
968         GOTO(log, rc);
969
970 log:
971         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found bad name hash "
972                "on the MDT %x, parent "DFID", name %s, shard_%x "DFID
973                ": rc = %d\n",
974                lfsck_lfsck2name(lfsck), lfsck_dev_idx(lfsck->li_bottom),
975                PFID(pfid), name, llmv->ll_lmv.lmv_master_mdt_index,
976                PFID(lfsck_dto2fid(shard)), rc);
977
978         if (parent != NULL && !IS_ERR(parent))
979                 lfsck_object_put(env, parent);
980
981         return rc;
982 }
983
984 /**
985  * Scan the shard of a striped directory for name hash verification.
986  *
987  * During the first-stage scanning, if the LFSCK cannot make sure whether
988  * the shard of a stripe directory contains valid slave LMV EA or not, then
989  * it will skip the name hash verification for this shard temporarily, and
990  * record the shard's FID in the LFSCK tracing file. As the LFSCK processing,
991  * the slave LMV EA may has been verified/fixed by LFSCK instance on master.
992  * Then in the second-stage scanning, the shard will be re-scanned, and for
993  * every name entry under the shard, the name hash will be verified, and for
994  * unmatched name entry, the LFSCK will try to fix it.
995  *
996  * \param[in] env       pointer to the thread context
997  * \param[in] com       pointer to the lfsck component
998  * \param[in] child     pointer to the directory object to be handled
999  *
1000  * \retval              positive number for scanning successfully
1001  * \retval              zero for the scanning is paused
1002  * \retval              negative error number on failure
1003  */
1004 int lfsck_namespace_scan_shard(const struct lu_env *env,
1005                                struct lfsck_component *com,
1006                                struct dt_object *child)
1007 {
1008         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1009         struct lmv_mds_md_v1            *lmv    = &info->lti_lmv;
1010         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1011         struct lfsck_namespace          *ns     = com->lc_file_ram;
1012         struct ptlrpc_thread            *thread = &lfsck->li_thread;
1013         struct lu_dirent                *ent    =
1014                         (struct lu_dirent *)info->lti_key;
1015         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
1016         struct lfsck_lmv                *llmv   = NULL;
1017         const struct dt_it_ops          *iops;
1018         struct dt_it                    *di;
1019         __u64                            cookie;
1020         __u32                            args;
1021         int                              rc;
1022         __u16                            type;
1023         ENTRY;
1024
1025         rc = lfsck_read_stripe_lmv(env, child, lmv);
1026         if (rc != 0)
1027                 RETURN(rc == -ENODATA ? 1 : rc);
1028
1029         if (lmv->lmv_magic != LMV_MAGIC_STRIPE)
1030                 RETURN(1);
1031
1032         if (unlikely(!dt_try_as_dir(env, child)))
1033                 RETURN(-ENOTDIR);
1034
1035         OBD_ALLOC_PTR(llmv);
1036         if (llmv == NULL)
1037                 RETURN(-ENOMEM);
1038
1039         llmv->ll_lmv_slave = 1;
1040         llmv->ll_lmv_verified = 1;
1041         llmv->ll_lmv = *lmv;
1042         atomic_set(&llmv->ll_ref, 1);
1043
1044         args = lfsck->li_args_dir & ~(LUDA_VERIFY | LUDA_VERIFY_DRYRUN);
1045         iops = &child->do_index_ops->dio_it;
1046         di = iops->init(env, child, args, BYPASS_CAPA);
1047         if (IS_ERR(di))
1048                 GOTO(out, rc = PTR_ERR(di));
1049
1050         rc = iops->load(env, di, 0);
1051         if (rc == 0)
1052                 rc = iops->next(env, di);
1053         else if (rc > 0)
1054                 rc = 0;
1055
1056         while (rc == 0) {
1057                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
1058                     cfs_fail_val > 0) {
1059                         struct l_wait_info lwi;
1060
1061                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
1062                                           NULL, NULL);
1063                         l_wait_event(thread->t_ctl_waitq,
1064                                      !thread_is_running(thread),
1065                                      &lwi);
1066
1067                         if (unlikely(!thread_is_running(thread)))
1068                                 GOTO(out, rc = 0);
1069                 }
1070
1071                 rc = iops->rec(env, di, (struct dt_rec *)ent, args);
1072                 if (rc == 0)
1073                         rc = lfsck_unpack_ent(ent, &cookie, &type);
1074
1075                 if (rc != 0) {
1076                         if (bk->lb_param & LPF_FAILOUT)
1077                                 GOTO(out, rc);
1078
1079                         goto next;
1080                 }
1081
1082                 /* skip dot and dotdot entries */
1083                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1084                         goto next;
1085
1086                 if (!lfsck_is_valid_slave_name_entry(env, llmv, ent->lde_name,
1087                                                      ent->lde_namelen)) {
1088                         ns->ln_flags |= LF_INCONSISTENT;
1089                         rc = lfsck_namespace_repair_bad_name_hash(env, com,
1090                                                 child, llmv, ent->lde_name);
1091                         if (rc >= 0)
1092                                 ns->ln_name_hash_repaired++;
1093                 }
1094
1095                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
1096                         GOTO(out, rc);
1097
1098                 /* Rate control. */
1099                 lfsck_control_speed(lfsck);
1100                 if (unlikely(!thread_is_running(thread)))
1101                         GOTO(out, rc = 0);
1102
1103                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
1104                         spin_lock(&lfsck->li_lock);
1105                         thread_set_flags(thread, SVC_STOPPING);
1106                         spin_unlock(&lfsck->li_lock);
1107
1108                         GOTO(out, rc = -EINVAL);
1109                 }
1110
1111 next:
1112                 rc = iops->next(env, di);
1113         }
1114
1115         GOTO(out, rc);
1116
1117 out:
1118         iops->put(env, di);
1119         iops->fini(env, di);
1120         lfsck_lmv_put(env, llmv);
1121
1122         return rc;
1123 }
1124
1125 /**
1126  * Verify the slave object's (of striped directory) LMV EA.
1127  *
1128  * For the slave object of a striped directory, before traversing the shard
1129  * the LFSCK will verify whether its slave LMV EA matches its parent's master
1130  * LMV EA or not.
1131  *
1132  * \param[in] env       pointer to the thread context
1133  * \param[in] com       pointer to the lfsck component
1134  * \param[in] obj       pointer to the object which LMV EA will be checked
1135  * \param[in] llmv      pointer to buffer holding the slave LMV EA
1136  *
1137  * \retval              zero for succeed
1138  * \retval              negative error number on failure
1139  */
1140 int lfsck_namespace_verify_stripe_slave(const struct lu_env *env,
1141                                         struct lfsck_component *com,
1142                                         struct dt_object *obj,
1143                                         struct lfsck_lmv *llmv)
1144 {
1145         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1146         char                            *name   = info->lti_key;
1147         char                            *name2;
1148         struct lu_fid                   *pfid   = &info->lti_fid3;
1149         struct lu_fid                   *tfid   = &info->lti_fid4;
1150         const struct lu_fid             *cfid   = lfsck_dto2fid(obj);
1151         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1152         struct lmv_mds_md_v1            *clmv   = &llmv->ll_lmv;
1153         struct lmv_mds_md_v1            *plmv   = &info->lti_lmv;
1154         struct dt_object                *parent = NULL;
1155         int                              rc     = 0;
1156         ENTRY;
1157
1158         if (!lfsck_is_valid_slave_lmv(clmv)) {
1159                 rc = lfsck_namespace_trace_update(env, com, cfid,
1160                                         LNTF_UNCERTAIN_LMV, true);
1161
1162                 GOTO(out, rc);
1163         }
1164
1165         rc = dt_lookup(env, obj, (struct dt_rec *)pfid,
1166                        (const struct dt_key *)dotdot, BYPASS_CAPA);
1167         if (rc != 0 || !fid_is_sane(pfid)) {
1168                 rc = lfsck_namespace_trace_update(env, com, cfid,
1169                                         LNTF_UNCERTAIN_LMV, true);
1170
1171                 GOTO(out, rc);
1172         }
1173
1174         parent = lfsck_object_find(env, lfsck, pfid);
1175         if (IS_ERR(parent)) {
1176                 rc = lfsck_namespace_trace_update(env, com, cfid,
1177                                         LNTF_UNCERTAIN_LMV, true);
1178
1179                 GOTO(out, rc);
1180         }
1181
1182         rc = lfsck_read_stripe_lmv(env, parent, plmv);
1183         if (rc != 0) {
1184                 int rc1;
1185
1186                 /* If the parent has no LMV EA, then it maybe because:
1187                  * 1) The parent lost the LMV EA.
1188                  * 2) The child claims a wrong (slave) LMV EA. */
1189                 if (rc == -ENODATA)
1190                         rc = lfsck_namespace_set_lmv_master(env, com, parent,
1191                                         clmv, cfid, clmv->lmv_master_mdt_index,
1192                                         LEF_SET_LMV_ALL);
1193                 else
1194                         rc = 0;
1195
1196                 rc1 = lfsck_namespace_trace_update(env, com, cfid,
1197                                                    LNTF_UNCERTAIN_LMV, true);
1198
1199                 GOTO(out, rc = (rc < 0 ? rc : rc1));
1200         }
1201
1202         /* Unmatched magic or stripe count. */
1203         if (unlikely(plmv->lmv_magic != LMV_MAGIC ||
1204                      plmv->lmv_stripe_count != clmv->lmv_stripe_count)) {
1205                 rc = lfsck_namespace_trace_update(env, com, cfid,
1206                                                   LNTF_UNCERTAIN_LMV, true);
1207
1208                 GOTO(out, rc);
1209         }
1210
1211         /* If the master hash type has been set as LMV_HASH_TYPE_UNKNOWN,
1212          * then the slave hash type is not important. */
1213         if ((plmv->lmv_hash_type & LMV_HASH_TYPE_MASK) ==
1214             LMV_HASH_TYPE_UNKNOWN &&
1215             plmv->lmv_hash_type & LMV_HASH_FLAG_BAD_TYPE)
1216                 GOTO(out, rc = 0);
1217
1218         /* Unmatched hash type. */
1219         if (unlikely((plmv->lmv_hash_type & LMV_HASH_TYPE_MASK) !=
1220                      (clmv->lmv_hash_type & LMV_HASH_TYPE_MASK))) {
1221                 rc = lfsck_namespace_trace_update(env, com, cfid,
1222                                                   LNTF_UNCERTAIN_LMV, true);
1223
1224                 GOTO(out, rc);
1225         }
1226
1227         snprintf(info->lti_tmpbuf2, sizeof(info->lti_tmpbuf2), DFID":%u",
1228                  PFID(cfid), clmv->lmv_master_mdt_index);
1229         name2 = info->lti_tmpbuf2;
1230
1231         rc = lfsck_links_get_first(env, obj, name, tfid);
1232         if (rc == 0 && strcmp(name, name2) == 0 && lu_fid_eq(pfid, tfid)) {
1233                 llmv->ll_lmv_verified = 1;
1234
1235                 GOTO(out, rc);
1236         }
1237
1238         rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
1239                        (const struct dt_key *)name2, BYPASS_CAPA);
1240         if (rc != 0 || !lu_fid_eq(cfid, tfid))
1241                 rc = lfsck_namespace_trace_update(env, com, cfid,
1242                                                   LNTF_UNCERTAIN_LMV, true);
1243         else
1244                 llmv->ll_lmv_verified = 1;
1245
1246         GOTO(out, rc);
1247
1248 out:
1249         if (parent != NULL && !IS_ERR(parent))
1250                 lfsck_object_put(env, parent);
1251
1252         return rc;
1253 }
1254
1255 /**
1256  * Double scan the striped directory or the shard.
1257  *
1258  * \param[in] env       pointer to the thread context
1259  * \param[in] com       pointer to the lfsck component
1260  * \param[in] lnr       pointer to the namespace request that contains the
1261  *                      striped directory or the shard
1262  *
1263  * \retval              zero for succeed
1264  * \retval              negative error number on failure
1265  */
1266 int lfsck_namespace_striped_dir_rescan(const struct lu_env *env,
1267                                        struct lfsck_component *com,
1268                                        struct lfsck_namespace_req *lnr)
1269 {
1270         struct lfsck_namespace          *ns     = com->lc_file_ram;
1271         struct lfsck_lmv                *llmv   = lnr->lnr_lmv;
1272         struct dt_object                *dir    = lnr->lnr_obj;
1273         ENTRY;
1274
1275         /* XXX: it will be improved with subsequent patches landed. */
1276
1277         if (llmv->ll_lmv_slave && llmv->ll_lmv_verified) {
1278                 ns->ln_striped_shards_scanned++;
1279                 lfsck_namespace_trace_update(env, com,
1280                                         lfsck_dto2fid(dir),
1281                                         LNTF_UNCERTAIN_LMV |
1282                                         LNTF_RECHECK_NAME_HASH, false);
1283         }
1284
1285         RETURN(0);
1286 }