Whamcloud - gitweb
LU-11025 dne: support directory restripe
[fs/lustre-release.git] / lustre / lfsck / lfsck_striped_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, 2017, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_striped_dir.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 /*
32  * About the verification for striped directory. Some rules and assumptions:
33  *
34  * 1) lmv_magic: The magic may be wrong. But it is almost impossible (1/2^32
35  *    probability) that a master LMV EA claims as a slave LMV EA by wrong,
36  *    so we can ignore such race case and the reverse case.
37  *
38  * 2) lmv_master_mdt_index: The master index can be self-verified by compared
39  *    with the MDT index directly. The slave stripe index can be verified by
40  *    compared with the file name. Although both the name entry and the LMV EA
41  *    can be wrong, it is almost impossible that they hit the same bad data
42  *    So if they match each other, then trust them. Similarly, for the shard,
43  *    it stores index in both slave LMV EA and in linkEA, if the two copies
44  *    match, then trust them.
45  *
46  * 3) lmv_hash_type: The valid hash type should be LMV_HASH_TYPE_ALL_CHARS or
47  *    LMV_HASH_TYPE_FNV_1A_64. If the LFSCK instance on some slave finds that
48  *    the name hash against the hash function does not match the MDT, then it
49  *    will change the master LMV EA hash type as LMV_HASH_TYPE_UNKNOWN. With
50  *    such hash type, the whole striped directory still can be accessed via
51  *    lookup/readdir, and also support unlink, but cannot add new name entry.
52  *
53  * 3.1) If the master hash type is one of the valid values, then trust the
54  *      master LMV EA. Because:
55  *
56  * 3.1.1) The master hash type is visible to the client and used by the client.
57  *
58  * 3.1.2) For a given name, different hash types may map the name entry to the
59  *        same MDT. So simply checking one name entry or some name entries may
60  *        cannot verify whether the hash type is correct or not.
61  *
62  * 3.1.3) Different shards can claim different hash types, it is not easy to
63  *        distinguish which ones are correct. Even though the master is wrong,
64  *        as the LFSCK processing, some LFSCK instance on other MDT may finds
65  *        unmatched name hash, then it will change the master hash type to
66  *        LMV_HASH_TYPE_UNKNOWN as described above. The worst case is euqal
67  *        to the case without the LFSCK.
68  *
69  * 3.2) If the master hash type is invalid, nor LMV_HASH_TYPE_UNKNOWN, then
70  *      trust the first shard with valid hash type (ALL_CHARS or FNV_1A_64).
71  *      If the shard is also worng, means there are double failures, then as
72  *      the LFSCK processing, other LFSCK instances on the other MDTs may
73  *      find unmatched name hash, and then, the master hash type will be
74  *      changed to LMV_HASH_TYPE_UNKNOWN as described in the 3).
75  *
76  * 3.3) If the master hash type is LMV_HASH_TYPE_UNKNOWN, then it is possible
77  *      that some other LFSCK instance on other MDT found bad name hash, then
78  *      changed the master hash type to LMV_HASH_TYPE_UNKNOWN as described in
79  *      the 3). But it also maybe because of data corruption in master LMV EA.
80  *      To make such two cases to be distinguishable, when the LFSCK changes
81  *      the master hash type to LMV_HASH_TYPE_UNKNOWN, it will mark in the
82  *      master LMV EA (new lmv flags LMV_HASH_FLAG_BAD_TYPE). Then subsequent
83  *      LFSCK checking can distinguish them: for former case, turst the master
84  *      LMV EA with nothing to be done; otherwise, trust the first shard with
85  *      valid hash type (ALL_CHARS or FNV_1A_64) as the 3.2) does.
86  *
87  * 4) lmv_stripe_count: For a shard of a striped directory, if its index has
88  *    been verified as the 2), then the stripe count must be larger than its
89  *    index. For the master object, by scanning each shard's index, the LFSCK
90  *    can know the highest index, and the stripe count must be larger than the
91  *    known highest index. If the stipe count in the LMV EA matches above two
92  *    rules, then it is may be trustable. If both the master claimed stripe
93  *    count and the slave claimed stripe count match each own rule, but they
94  *    are not the same, then trust the master. Because the stripe count in
95  *    the master LMV EA is visible to client and used to distribute the name
96  *    entry to some shard, but the slave LMV EA is only used for verification
97  *    and invisible to client.
98  *
99  * 5) If the master LMV EA is lost, then there are two possible cases:
100  *
101  * 5.1) The slave claims slave LMV EA by wrong, means that the parent was not
102  *      a striped directory, but its sub-directory has a wrong slave LMV EA.
103  *      It is very very race case, similar as the 1), can be ignored.
104  *
105  * 5.2) The parent directory is a striped directory, but the master LMV EA
106  *      is lost or crashed. Then the LFSCK needs to re-generate the master
107  *      LMV EA: the lmv_master_mdt_index is from the MDT device index; the
108  *      lmv_hash_type is from the first valid shard; the lmv_stripe_count
109  *      will be calculated via scanning all the shards.
110  *
111  * 5.2.1) Before re-generating the master LMV EA, the LFSCK needs to check
112  *        whether someone has created some file(s) under the master object
113  *        after the master LMV EA disappear. If yes, the LFSCK will cannot
114  *        re-generate the master LMV EA, otherwise, such new created files
115  *        will be invisible to client. Under such case, the LFSCK will mark
116  *        the master object as read only (without master LMV EA). Then all
117  *        things under the master MDT-object, including those new created
118  *        files and the shards themselves, will be visibile to client. And
119  *        then the administrator can handle the bad striped directory with
120  *        more human knowledge.
121  *
122  * 5.2.2) If someone created some special sub-directory under the master
123  *        MDT-object with the same naming rule as shard name $FID:$index,
124  *        as to the LFSCK cannot detect it before re-generating the master
125  *        LMV EA, then such sub-directory itself will be invisible after
126  *        the LFSCK re-generating the master LMV EA. The sub-items under
127  *        such sub-directory are still visible to client. As the LFSCK
128  *        processing, if such sub-directory cause some conflict with other
129  *        normal shard, such as the index conflict, then the LFSCK will
130  *        remove the master LMV EA and change the master MDT-object to
131  *        read-only mode as the 5.2.1). But if there is no conflict, the
132  *        LFSCK will regard such sub-directory as a striped shard that
133  *        lost its slave LMV EA, and will re-generate slave LMV EA for it.
134  *
135  * 5.2.3) Anytime, if the LFSCK found some shards name/index conflict,
136  *        and cannot make the distinguish which one is right, then it
137  *        will remove the master LMV EA and change the MDT-object to
138  *        read-only mode as the 5.2.2).
139  */
140
141 #define DEBUG_SUBSYSTEM S_LFSCK
142
143 #include <lu_object.h>
144 #include <dt_object.h>
145 #include <md_object.h>
146 #include <lustre_fid.h>
147 #include <lustre_lib.h>
148 #include <lustre_net.h>
149 #include <lustre_lmv.h>
150
151 #include "lfsck_internal.h"
152
153 void lfsck_lmv_put(const struct lu_env *env, struct lfsck_lmv *llmv)
154 {
155         if (llmv != NULL && atomic_dec_and_test(&llmv->ll_ref)) {
156                 if (llmv->ll_inline) {
157                         struct lfsck_lmv_unit   *llu;
158                         struct lfsck_instance   *lfsck;
159
160                         llu = list_entry(llmv, struct lfsck_lmv_unit, llu_lmv);
161                         lfsck = llu->llu_lfsck;
162
163                         spin_lock(&lfsck->li_lock);
164                         list_del(&llu->llu_link);
165                         spin_unlock(&lfsck->li_lock);
166
167                         lfsck_object_put(env, llu->llu_obj);
168
169                         LASSERT(llmv->ll_lslr != NULL);
170
171                         OBD_FREE_PTR_ARRAY_LARGE(llmv->ll_lslr,
172                                                  llmv->ll_stripes_allocated);
173                         OBD_FREE_PTR(llu);
174                 } else {
175                         if (llmv->ll_lslr != NULL)
176                                 OBD_FREE_PTR_ARRAY_LARGE(
177                                         llmv->ll_lslr,
178                                         llmv->ll_stripes_allocated);
179
180                         OBD_FREE_PTR(llmv);
181                 }
182         }
183 }
184
185 /**
186  * Mark the specified directory as read-only by set LUSTRE_IMMUTABLE_FL.
187  *
188  * The caller has taken the ldlm lock on the @obj already.
189  *
190  * \param[in] env       pointer to the thread context
191  * \param[in] com       pointer to the lfsck component
192  * \param[in] obj       pointer to the object to be handled
193  * \param[in] del_lmv   true if need to drop the LMV EA
194  *
195  * \retval              positive number if nothing to be done
196  * \retval              zero for success
197  * \retval              negative error number on failure
198  */
199 static int lfsck_disable_master_lmv(const struct lu_env *env,
200                                     struct lfsck_component *com,
201                                     struct dt_object *obj, bool del_lmv)
202 {
203         struct lfsck_thread_info        *info   = lfsck_env_info(env);
204         struct lu_attr                  *la     = &info->lti_la;
205         struct lfsck_instance           *lfsck  = com->lc_lfsck;
206         struct dt_device                *dev    = lfsck_obj2dev(obj);
207         struct thandle                  *th     = NULL;
208         int                              rc     = 0;
209         ENTRY;
210
211         th = dt_trans_create(env, dev);
212         if (IS_ERR(th))
213                 GOTO(log, rc = PTR_ERR(th));
214
215         if (del_lmv) {
216                 rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LMV, th);
217                 if (rc != 0)
218                         GOTO(stop, rc);
219         }
220
221         la->la_valid = LA_FLAGS;
222         rc = dt_declare_attr_set(env, obj, la, th);
223         if (rc != 0)
224                 GOTO(stop, rc);
225
226         rc = dt_trans_start_local(env, dev, th);
227         if (rc != 0)
228                 GOTO(stop, rc);
229
230         dt_write_lock(env, obj, 0);
231         if (unlikely(lfsck_is_dead_obj(obj)))
232                 GOTO(unlock, rc = 1);
233
234         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
235                 GOTO(unlock, rc = 0);
236
237         if (del_lmv) {
238                 rc = dt_xattr_del(env, obj, XATTR_NAME_LMV, th);
239                 if (rc != 0)
240                         GOTO(unlock, rc);
241         }
242
243         rc = dt_attr_get(env, obj, la);
244         if (rc == 0 && !(la->la_flags & LUSTRE_IMMUTABLE_FL)) {
245                 la->la_valid = LA_FLAGS;
246                 la->la_flags |= LUSTRE_IMMUTABLE_FL;
247                 rc = dt_attr_set(env, obj, la, th);
248         }
249
250         GOTO(unlock, rc);
251
252 unlock:
253         dt_write_unlock(env, obj);
254
255 stop:
256         dt_trans_stop(env, dev, th);
257
258 log:
259         CDEBUG(D_LFSCK, "%s: namespace LFSCK set the master MDT-object of "
260                "the striped directory "DFID" as read-only: rc = %d\n",
261                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
262
263         if (rc <= 0) {
264                 struct lfsck_namespace *ns = com->lc_file_ram;
265
266                 ns->ln_flags |= LF_INCONSISTENT;
267                 if (rc == 0)
268                         ns->ln_striped_dirs_disabled++;
269         }
270
271         return rc;
272 }
273
274 static inline bool lfsck_is_valid_slave_lmv(struct lmv_mds_md_v1 *lmv)
275 {
276         return lmv->lmv_stripe_count >= 1 &&
277                lmv->lmv_stripe_count <= LFSCK_LMV_MAX_STRIPES &&
278                lmv->lmv_stripe_count > lmv->lmv_master_mdt_index &&
279                lmv_is_known_hash_type(lmv->lmv_hash_type);
280 }
281
282 /**
283  * Remove the striped directory's master LMV EA and mark it as read-only.
284  *
285  * Take ldlm lock on the striped directory before calling the
286  * lfsck_disable_master_lmv().
287  *
288  * \param[in] env       pointer to the thread context
289  * \param[in] com       pointer to the lfsck component
290  * \param[in] obj       pointer to the striped directory to be handled
291  * \param[in] lnr       pointer to the namespace request that contains the
292  *                      striped directory to be handled and other information
293  *
294  * \retval              positive number if nothing to be done
295  * \retval              zero for success
296  * \retval              negative error number on failure
297  */
298 static int lfsck_remove_lmv(const struct lu_env *env,
299                             struct lfsck_component *com,
300                             struct dt_object *obj,
301                             struct lfsck_namespace_req *lnr)
302 {
303         struct lustre_handle     lh     = { 0 };
304         int                      rc;
305
306         lnr->lnr_lmv->ll_ignore = 1;
307         rc = lfsck_ibits_lock(env, com->lc_lfsck, obj, &lh,
308                               MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
309                               LCK_EX);
310         if (rc == 0) {
311                 rc = lfsck_disable_master_lmv(env, com, obj, true);
312                 lfsck_ibits_unlock(&lh, LCK_EX);
313         }
314
315         return rc;
316 }
317
318 /**
319  * Remove the name entry from the striped directory's master MDT-object.
320  *
321  * \param[in] env       pointer to the thread context
322  * \param[in] com       pointer to the lfsck component
323  * \param[in] dir       pointer to the striped directory
324  * \param[in] fid       the shard's FID which name entry will be removed
325  * \param[in] index     the shard's index which name entry will be removed
326  *
327  * \retval              positive number for repaired successfully
328  * \retval              0 if nothing to be repaired
329  * \retval              negative error number on failure
330  */
331 static int lfsck_remove_dirent(const struct lu_env *env,
332                                struct lfsck_component *com,
333                                struct dt_object *dir,
334                                const struct lu_fid *fid, __u32 index)
335 {
336         struct lfsck_thread_info        *info = lfsck_env_info(env);
337         struct dt_object                *obj;
338         int                              rc;
339
340         snprintf(info->lti_tmpbuf2, sizeof(info->lti_tmpbuf2), DFID":%u",
341                  PFID(fid), index);
342         obj = lfsck_object_find_bottom(env, com->lc_lfsck, fid);
343         if (IS_ERR(obj))
344                 return PTR_ERR(obj);
345
346         rc = lfsck_namespace_repair_dirent(env, com, dir, obj,
347                                         info->lti_tmpbuf2, info->lti_tmpbuf2,
348                                         S_IFDIR, false, false);
349         lfsck_object_put(env, obj);
350         if (rc > 0) {
351                 struct lfsck_namespace *ns = com->lc_file_ram;
352
353                 ns->ln_dirent_repaired++;
354         }
355
356         return rc;
357 }
358
359 /**
360  * Remove old shard's name entry and refill the @lslr slot with new shard.
361  *
362  * Some old shard held the specified @lslr slot, but it is an invalid shard.
363  * This function will remove the bad shard's name entry, and refill the @lslr
364  * slot with the new shard.
365  *
366  * \param[in] env       pointer to the thread context
367  * \param[in] com       pointer to the lfsck component
368  * \param[in] dir       pointer to the striped directory to be handled
369  * \param[in] lslr      pointer to lfsck_disable_master_lmv slot which content
370  *                      will be replaced by the given information
371  * \param[in] lnr       contain the shard's FID to be used to fill the
372  *                      @lslr slot, it also records the known max filled index
373  *                      and the known max stripe count
374  * \param[in] lmv       contain the slave LMV EA to be used to fill the
375  *                      @lslr slot
376  * \param[in] index     the old shard's index in the striped directory
377  * \param[in] flags     the new shard's flags in the @lslr slot
378  *
379  * \retval              zero for success
380  * \retval              negative error number on failure
381  */
382 static int lfsck_replace_lmv(const struct lu_env *env,
383                              struct lfsck_component *com,
384                              struct dt_object *dir,
385                              struct lfsck_slave_lmv_rec *lslr,
386                              struct lfsck_namespace_req *lnr,
387                              struct lmv_mds_md_v1 *lmv,
388                              __u32 index, __u32 flags)
389 {
390         struct lfsck_lmv *llmv = lnr->lnr_lmv;
391         int               rc;
392
393         rc = lfsck_remove_dirent(env, com, dir,
394                                  &lslr->lslr_fid, index);
395         if (rc < 0)
396                 return rc;
397
398         lslr->lslr_fid = lnr->lnr_fid;
399         lslr->lslr_flags = flags;
400         lslr->lslr_stripe_count = lmv->lmv_stripe_count;
401         lslr->lslr_index = lmv->lmv_master_mdt_index;
402         lslr->lslr_hash_type = lmv->lmv_hash_type;
403         if (flags == LSLF_NONE) {
404                 if (llmv->ll_hash_type == LMV_HASH_TYPE_UNKNOWN &&
405                     lmv_is_known_hash_type(lmv->lmv_hash_type))
406                         llmv->ll_hash_type = lmv->lmv_hash_type;
407
408                 if (lslr->lslr_stripe_count <= LFSCK_LMV_MAX_STRIPES &&
409                     llmv->ll_max_stripe_count < lslr->lslr_stripe_count)
410                         llmv->ll_max_stripe_count = lslr->lslr_stripe_count;
411         }
412
413         return 0;
414 }
415
416 /**
417  * Record the slave LMV EA in the lfsck_lmv::ll_lslr.
418  *
419  * If the lfsck_lmv::ll_lslr slot corresponding to the given @shard_idx is free,
420  * then fill the slot with the given @lnr/@lmv/@flags directly (maybe need to
421  * extend the lfsck_lmv::ll_lslr buffer).
422  *
423  * If the lfsck_lmv::ll_lslr slot corresponding to the given @shard_idx is taken
424  * by other shard, then the LFSCK will try to resolve the conflict by checking
425  * the two conflict shards' flags, and try other possible slot (if one of them
426  * claims another possible @shard_idx).
427  *
428  * 1) If one of the two conflict shards can be recorded in another slot, then
429  *    it is OK, go ahead. Otherwise,
430  *
431  * 2) If one of them is dangling name entry, then remove (one of) the dangling
432  *    name entry (and replace related @lslr slot if needed). Otherwise,
433  *
434  * 3) If one of them has no slave LMV EA, then check whether the master LMV
435  *    EA has ever been lost and re-generated (LMV_HASH_FLAG_LOST_LMV in the
436  *    master LMV EA).
437  *
438  * 3.1) If yes, then it is possible that such object is not a real shard of
439  *      the striped directory, instead, it was created by someone after the
440  *      master LMV EA lost with the name that matches the shard naming rule.
441  *      Then the LFSCK will remove the master LMV EA and mark the striped
442  *      directory as read-only to allow those non-shard files to be visible
443  *      to client.
444  *
445  * 3.2) If no, then remove (one of) the object what has no slave LMV EA.
446  *
447  * 4) If all above efforts cannot work, then the LFSCK cannot know how to
448  *    recover the striped directory. To make the administrator can see the
449  *    conflicts, the LFSCK will remove the master LMV EA and mark the striped
450  *    directory as read-only.
451  *
452  * This function may be called recursively, to prevent overflow, we define
453  * LFSCK_REC_LMV_MAX_DEPTH to restrict the recursive call depth.
454  *
455  * \param[in] env       pointer to the thread context
456  * \param[in] com       pointer to the lfsck component
457  * \param[in] dir       pointer to the striped directory to be handled
458  * \param[in] lnr       contain the shard's FID to fill the @lslr slot,
459  *                      it also records the known max filled index and
460  *                      the known max stripe count
461  * \param[in] lmv       pointer to the slave LMV EA to be recorded
462  * \param[in] shard_idx the shard's index used for locating the @lslr slot,
463  *                      it can be the index stored in the shard's name,
464  *                      it also can be the index stored in the slave LMV EA
465  *                      (for recursive case)
466  * \param[in] flags     the shard's flags to be recorded in the @lslr slot
467  *                      to indicate the shard status, such as whether has
468  *                      slave LMV EA, whether dangling name entry, whether
469  *                      the name entry and slave LMV EA unmatched, and ect
470  * \param[in] flags2    when be called recursively, the @flags2 tells the
471  *                      former conflict shard's flags in the @lslr slot.
472  * \param[in,out] depth To prevent to be called recurisively too deep,
473  *                      we define the max depth can be called recursively
474  *                      (LFSCK_REC_LMV_MAX_DEPTH)
475  *
476  * \retval              zero for success
477  * \retval              "-ERANGE" for invalid @shard_idx
478  * \retval              "-EEXIST" for the required lslr slot has been
479  *                      occupied by other shard
480  * \retval              other negative error number on failure
481  */
482 static int lfsck_record_lmv(const struct lu_env *env,
483                             struct lfsck_component *com,
484                             struct dt_object *dir,
485                             struct lfsck_namespace_req *lnr,
486                             struct lmv_mds_md_v1 *lmv, __u32 shard_idx,
487                             __u32 flags, __u32 flags2, __u32 *depth)
488 {
489         struct lfsck_instance      *lfsck = com->lc_lfsck;
490         struct lfsck_lmv           *llmv  = lnr->lnr_lmv;
491         const struct lu_fid        *fid   = &lnr->lnr_fid;
492         struct lfsck_slave_lmv_rec *lslr;
493         struct lfsck_rec_lmv_save  *lrls;
494         int                         index = shard_idx;
495         int                         rc    = 0;
496         ENTRY;
497
498         CDEBUG(D_LFSCK, "%s: record slave LMV EA for the striped directory "
499                DFID": shard = "DFID", index = %u, flags = %u, flags2 = %u, "
500                "depth = %d\n", lfsck_lfsck2name(lfsck),
501                PFID(lfsck_dto2fid(dir)), PFID(fid),
502                index, flags, flags2, *depth);
503
504         if (index < 0 || index >= LFSCK_LMV_MAX_STRIPES)
505                 RETURN(-ERANGE);
506
507         if (index >= llmv->ll_stripes_allocated) {
508                 struct lfsck_slave_lmv_rec *new_lslr;
509                 int new_stripes = index + 1;
510                 size_t old_size = sizeof(*lslr) * llmv->ll_stripes_allocated;
511
512                 OBD_ALLOC_PTR_ARRAY_LARGE(new_lslr, new_stripes);
513                 if (new_lslr == NULL) {
514                         llmv->ll_failed = 1;
515
516                         RETURN(-ENOMEM);
517                 }
518
519                 memcpy(new_lslr, llmv->ll_lslr, old_size);
520                 OBD_FREE_LARGE(llmv->ll_lslr, old_size);
521                 llmv->ll_stripes_allocated = new_stripes;
522                 llmv->ll_lslr = new_lslr;
523         }
524
525         lslr = llmv->ll_lslr + index;
526         if (unlikely(lu_fid_eq(&lslr->lslr_fid, fid)))
527                 RETURN(0);
528
529         if (fid_is_zero(&lslr->lslr_fid)) {
530                 lslr->lslr_fid = *fid;
531                 lslr->lslr_stripe_count = lmv->lmv_stripe_count;
532                 lslr->lslr_index = lmv->lmv_master_mdt_index;
533                 lslr->lslr_hash_type = lmv->lmv_hash_type;
534                 lslr->lslr_flags = flags;
535                 llmv->ll_stripes_filled++;
536                 if (flags == LSLF_NONE) {
537                         if (llmv->ll_hash_type == LMV_HASH_TYPE_UNKNOWN &&
538                             lmv_is_known_hash_type(lmv->lmv_hash_type))
539                                 llmv->ll_hash_type = lmv->lmv_hash_type;
540
541                         if (lslr->lslr_stripe_count <= LFSCK_LMV_MAX_STRIPES &&
542                             llmv->ll_max_stripe_count < lslr->lslr_stripe_count)
543                                 llmv->ll_max_stripe_count =
544                                                         lslr->lslr_stripe_count;
545                 }
546
547                 if (llmv->ll_max_filled_off < index)
548                         llmv->ll_max_filled_off = index;
549
550                 RETURN(0);
551         }
552
553         (*depth)++;
554         if (flags != LSLF_BAD_INDEX2)
555                 LASSERTF(*depth == 1, "depth = %d\n", *depth);
556
557         /* Handle conflict cases. */
558         switch (lslr->lslr_flags) {
559         case LSLF_NONE:
560         case LSLF_BAD_INDEX2:
561                 /* The existing one is a normal valid object. */
562                 switch (flags) {
563                 case LSLF_NONE:
564                         /* The two 'valid' name entries claims the same
565                          * index, the LFSCK cannot distinguish which one
566                          * is correct. Then remove the master LMV EA to
567                          * make all shards to be visible to client, and
568                          * mark the master MDT-object as read-only. The
569                          * administrator can handle the conflict with
570                          * more human knowledge. */
571                         rc = lfsck_remove_lmv(env, com, dir, lnr);
572                         break;
573                 case LSLF_BAD_INDEX2:
574                         GOTO(out, rc = -EEXIST);
575                 case LSLF_NO_LMVEA:
576
577 no_lmvea:
578                         if (llmv->ll_lmv.lmv_hash_type &
579                             LMV_HASH_FLAG_LOST_LMV) {
580                                 /* If the master LMV EA was re-generated
581                                  * by the former LFSCK reparation, and
582                                  * before such reparation, someone has
583                                  * created the conflict object, but the
584                                  * LFSCK did not detect such conflict,
585                                  * then we have to remove the master
586                                  * LMV EA and mark the master MDT-object
587                                  * as read-only. The administrator can
588                                  * handle the conflict with more human
589                                  * knowledge. */
590                                 rc = lfsck_remove_lmv(env, com, dir, lnr);
591                         } else {
592                                 /* Otherwise, remove the current name entry,
593                                  * and add its FID in the LFSCK tracing file
594                                  * for further processing. */
595                                 rc = lfsck_namespace_trace_update(env, com, fid,
596                                                 LNTF_CHECK_PARENT, true);
597                                 if (rc == 0)
598                                         rc = lfsck_remove_dirent(env, com, dir,
599                                                                  fid, index);
600                         }
601
602                         break;
603                 case LSLF_DANGLING:
604                         /* Remove the current dangling name entry. */
605                         rc = lfsck_remove_dirent(env, com, dir, fid, index);
606                         break;
607                 case LSLF_BAD_INDEX1:
608                         index = lmv->lmv_master_mdt_index;
609                         lmv->lmv_master_mdt_index = shard_idx;
610                         /* The name entry claims an index that is conflict
611                          * with a valid existing name entry, then try the
612                          * index in the lmv recursively. */
613                         rc = lfsck_record_lmv(env, com, dir, lnr, lmv, index,
614                                 LSLF_BAD_INDEX2, lslr->lslr_flags, depth);
615                         lmv->lmv_master_mdt_index = index;
616                         if (rc == -ERANGE || rc == -EEXIST)
617                                 /* The index in the lmv is invalid or
618                                  * also conflict with other. Then we do
619                                  * not know how to resolve the conflict.
620                                  * We will handle it as handle the case
621                                  * of 'LSLF_NONE' vs 'LSLF_NONE'. */
622                                 rc = lfsck_remove_lmv(env, com, dir, lnr);
623
624                         break;
625                 default:
626                         break;
627                 }
628
629                 break;
630         case LSLF_NO_LMVEA:
631                 /* The existing one has no slave LMV EA. */
632                 switch (flags) {
633                 case LSLF_NONE:
634
635 none:
636                         if (llmv->ll_lmv.lmv_hash_type &
637                             LMV_HASH_FLAG_LOST_LMV) {
638                                 /* If the master LMV EA was re-generated
639                                  * by the former LFSCK reparation, and
640                                  * before such reparation, someone has
641                                  * created the conflict object, but the
642                                  * LFSCK did not detect such conflict,
643                                  * then we have to remove the master
644                                  * LMV EA and mark the master MDT-object
645                                  * as read-only. The administrator can
646                                  * handle the conflict with more human
647                                  * knowledge. */
648                                 rc = lfsck_remove_lmv(env, com, dir, lnr);
649                         } else {
650                                 lrls = &lfsck->li_rec_lmv_save[*depth - 1];
651                                 lrls->lrls_fid = lslr->lslr_fid;
652                                 /* Otherwise, remove the existing name entry,
653                                  * and add its FID in the LFSCK tracing file
654                                  * for further processing. Refill the slot
655                                  * with current slave LMV EA. */
656                                 rc = lfsck_namespace_trace_update(env,
657                                                 com, &lrls->lrls_fid,
658                                                 LNTF_CHECK_PARENT, true);
659                                 if (rc == 0)
660                                         rc = lfsck_replace_lmv(env, com, dir,
661                                                 lslr, lnr, lmv, index, flags);
662                         }
663
664                         break;
665                 case LSLF_BAD_INDEX2:
666                         if (flags2 >= lslr->lslr_flags)
667                                 GOTO(out, rc = -EEXIST);
668
669                         goto none;
670                 case LSLF_NO_LMVEA:
671                         goto no_lmvea;
672                 case LSLF_DANGLING:
673                         /* Remove the current dangling name entry. */
674                         rc = lfsck_remove_dirent(env, com, dir, fid, index);
675                         break;
676                 case LSLF_BAD_INDEX1:
677                         index = lmv->lmv_master_mdt_index;
678                         lmv->lmv_master_mdt_index = shard_idx;
679                         /* The name entry claims an index that is conflict
680                          * with a valid existing name entry, then try the
681                          * index in the lmv recursively. */
682                         rc = lfsck_record_lmv(env, com, dir, lnr, lmv, index,
683                                 LSLF_BAD_INDEX2, lslr->lslr_flags, depth);
684                         lmv->lmv_master_mdt_index = index;
685                         if (rc == -ERANGE || rc == -EEXIST) {
686                                 index = shard_idx;
687                                 goto no_lmvea;
688                         }
689
690                         break;
691                 default:
692                         break;
693                 }
694
695                 break;
696         case LSLF_DANGLING:
697                 /* The existing one is a dangling name entry. */
698                 switch (flags) {
699                 case LSLF_NONE:
700                 case LSLF_BAD_INDEX2:
701                 case LSLF_NO_LMVEA:
702                         /* Remove the existing dangling name entry.
703                          * Refill the lslr slot with the given LMV. */
704                         rc = lfsck_replace_lmv(env, com, dir, lslr, lnr,
705                                                lmv, index, flags);
706                         break;
707                 case LSLF_DANGLING:
708                         /* Two dangling name entries conflict,
709                          * remove the current one. */
710                         rc = lfsck_remove_dirent(env, com, dir, fid, index);
711                         break;
712                 case LSLF_BAD_INDEX1:
713                         index = lmv->lmv_master_mdt_index;
714                         lmv->lmv_master_mdt_index = shard_idx;
715                         /* The name entry claims an index that is conflict
716                          * with a valid existing name entry, then try the
717                          * index in the lmv recursively. */
718                         rc = lfsck_record_lmv(env, com, dir, lnr, lmv, index,
719                                 LSLF_BAD_INDEX2, lslr->lslr_flags, depth);
720                         lmv->lmv_master_mdt_index = index;
721                         if (rc == -ERANGE || rc == -EEXIST)
722                                 /* If the index in the lmv is invalid or
723                                  * also conflict with other, then remove
724                                  * the existing dangling name entry.
725                                  * Refill the lslr slot with the given LMV. */
726                                 rc = lfsck_replace_lmv(env, com, dir, lslr, lnr,
727                                                        lmv, shard_idx, flags);
728
729                         break;
730                 default:
731                         break;
732                 }
733
734                 break;
735         case LSLF_BAD_INDEX1: {
736                 if (*depth >= LFSCK_REC_LMV_MAX_DEPTH)
737                         goto conflict;
738
739                 lrls = &lfsck->li_rec_lmv_save[*depth - 1];
740                 lrls->lrls_fid = lnr->lnr_fid;
741                 lrls->lrls_lmv = *lmv;
742
743                 lnr->lnr_fid = lslr->lslr_fid;
744                 lmv->lmv_master_mdt_index = index;
745                 lmv->lmv_stripe_count = lslr->lslr_stripe_count;
746                 lmv->lmv_hash_type = lslr->lslr_hash_type;
747                 index = lslr->lslr_index;
748
749                 /* The existing one has another possible slot,
750                  * try it recursively. */
751                 rc = lfsck_record_lmv(env, com, dir, lnr, lmv, index,
752                                       LSLF_BAD_INDEX2, flags, depth);
753                 *lmv = lrls->lrls_lmv;
754                 lnr->lnr_fid = lrls->lrls_fid;
755                 index = shard_idx;
756                 if (rc != 0) {
757                         if (rc == -ERANGE || rc == -EEXIST)
758                                 goto conflict;
759
760                         break;
761                 }
762
763                 lslr->lslr_fid = *fid;
764                 lslr->lslr_flags = flags;
765                 lslr->lslr_stripe_count = lmv->lmv_stripe_count;
766                 lslr->lslr_index = lmv->lmv_master_mdt_index;
767                 lslr->lslr_hash_type = lmv->lmv_hash_type;
768                 if (flags == LSLF_NONE) {
769                         if (llmv->ll_hash_type == LMV_HASH_TYPE_UNKNOWN &&
770                             lmv_is_known_hash_type(lmv->lmv_hash_type))
771                                 llmv->ll_hash_type = lmv->lmv_hash_type;
772
773                         if (lslr->lslr_stripe_count <= LFSCK_LMV_MAX_STRIPES &&
774                             llmv->ll_max_stripe_count < lslr->lslr_stripe_count)
775                                 llmv->ll_max_stripe_count =
776                                                         lslr->lslr_stripe_count;
777                 }
778
779                 break;
780
781 conflict:
782                 switch (flags) {
783                 case LSLF_NONE:
784                         /* The two 'valid' name entries claims the same
785                          * index, the LFSCK cannot distinguish which one
786                          * is correct. Then remove the master LMV EA to
787                          * make all shards to be visible to client, and
788                          * mark the master MDT-object as read-only. The
789                          * administrator can handle the conflict with
790                          * more human knowledge. */
791                         rc = lfsck_remove_lmv(env, com, dir, lnr);
792                         break;
793                 case LSLF_BAD_INDEX2:
794                         GOTO(out, rc = -EEXIST);
795                 case LSLF_NO_LMVEA:
796                         goto no_lmvea;
797                 case LSLF_DANGLING:
798                         /* Remove the current dangling name entry. */
799                         rc = lfsck_remove_dirent(env, com, dir, fid, index);
800                         break;
801                 case LSLF_BAD_INDEX1:
802                         index = lmv->lmv_master_mdt_index;
803                         lmv->lmv_master_mdt_index = shard_idx;
804                         /* The name entry claims an index that is conflict
805                          * with a valid existing name entry, then try the
806                          * index in the lmv recursively. */
807                         rc = lfsck_record_lmv(env, com, dir, lnr, lmv, index,
808                                 LSLF_BAD_INDEX2, lslr->lslr_flags, depth);
809                         lmv->lmv_master_mdt_index = index;
810                         if (rc == -ERANGE || rc == -EEXIST)
811                                 /* The index in the lmv is invalid or
812                                  * also conflict with other. Then we do
813                                  * not know how to resolve the conflict.
814                                  * We will handle it as handle the case
815                                  * of 'LSLF_NONE' vs 'LSLF_NONE'. */
816                                 rc = lfsck_remove_lmv(env, com, dir, lnr);
817
818                         break;
819                 }
820
821                 break;
822         }
823         default:
824                 break;
825         }
826
827         if (rc < 0)
828                 llmv->ll_failed = 1;
829
830         GOTO(out, rc);
831
832 out:
833         (*depth)--;
834
835         return rc > 0 ? 0 : rc;
836 }
837
838 /**
839  * Read LMV from bottom object, so it doesn't contain stripe FIDs.
840  *
841  * TODO: test migrating/foreign directory lfsck
842  *
843  * \param[in] env       thread env
844  * \param[in] lfsck     lfsck instance
845  * \param[in] obj       dt object
846  * \param[out] lmv      LMV data pointer
847  *
848  * \retval              0 on success
849  * \retval              -ENODATA on no LMV, corrupt LMV, dir is dead or foreign
850  *                      -ev on other failures
851  */
852 int lfsck_read_stripe_lmv(const struct lu_env *env,
853                           struct lfsck_instance *lfsck,
854                           struct dt_object *obj,
855                           struct lmv_mds_md_v1 *lmv)
856 {
857         struct lfsck_thread_info *info = lfsck_env_info(env);
858         struct lu_buf *buf = &info->lti_buf;
859         struct lmv_foreign_md *lfm;
860         int rc;
861
862         /* use bottom object to avoid reading in shard FIDs */
863         obj = lfsck_object_find_bottom(env, lfsck, lu_object_fid(&obj->do_lu));
864         if (IS_ERR(obj))
865                 return PTR_ERR(obj);
866
867         dt_read_lock(env, obj, 0);
868         buf->lb_buf = lmv;
869         buf->lb_len = sizeof(*lmv);
870         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LMV);
871         if (unlikely(rc == -ERANGE)) {
872                 buf = &info->lti_big_buf;
873                 /* this may be a foreign LMV */
874                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LMV);
875                 if (rc > sizeof(*lmv)) {
876                         int rc1;
877
878                         lu_buf_check_and_alloc(buf, rc);
879                         rc1 = dt_xattr_get(env, obj, buf, XATTR_NAME_LMV);
880                         if (rc != rc1)
881                                 rc = -ENODATA;
882                 } else {
883                         rc = -ENODATA;
884                 }
885         }
886         dt_read_unlock(env, obj);
887
888         lfsck_object_put(env, obj);
889
890         if (rc > offsetof(typeof(*lfm), lfm_value) &&
891             *((__u32 *)buf->lb_buf) == LMV_MAGIC_FOREIGN) {
892                 __u32 value_len;
893
894                 lfm = buf->lb_buf;
895                 value_len = le32_to_cpu(lfm->lfm_length);
896                 CDEBUG(D_INFO,
897                        "foreign LMV EA, magic %x, len %u, type %x, flags %x, for dir "DFID"\n",
898                        le32_to_cpu(lfm->lfm_magic), value_len,
899                        le32_to_cpu(lfm->lfm_type), le32_to_cpu(lfm->lfm_flags),
900                        PFID(lfsck_dto2fid(obj)));
901
902                 if (rc != value_len + offsetof(typeof(*lfm), lfm_value))
903                         CDEBUG(D_LFSCK,
904                                "foreign LMV EA internal size %u does not match EA full size %d for dir "DFID"\n",
905                                value_len, rc, PFID(lfsck_dto2fid(obj)));
906
907                 /* no further usage/decode of foreign LMV outside */
908                 return -ENODATA;
909         }
910
911         if (rc == sizeof(*lmv)) {
912                 rc = 0;
913                 lfsck_lmv_header_le_to_cpu(lmv, lmv);
914                 /* if LMV is corrupt, return -ENODATA */
915                 if (lmv->lmv_magic != LMV_MAGIC_V1 &&
916                     lmv->lmv_magic != LMV_MAGIC_STRIPE) 
917                         rc = -ENODATA;
918         } else if (rc >= 0) {
919                 /* LMV is corrupt */
920                 rc = -ENODATA;
921         }
922
923         return rc;
924 }
925
926 /**
927  * Parse the shard's index from the given shard name.
928  *
929  * The valid shard name/type should be:
930  * 1) The type must be S_IFDIR
931  * 2) The name should be $FID:$index
932  * 3) the index should within valid range.
933  *
934  * \param[in] env       pointer to the thread context
935  * \param[in] name      the shard name
936  * \param[in] namelen   the name length
937  * \param[in] type      the entry's type
938  * \param[in] fid       the entry's FID
939  *
940  * \retval              zero or positive number for the index from the name
941  * \retval              negative error number on failure
942  */
943 int lfsck_shard_name_to_index(const struct lu_env *env, const char *name,
944                               int namelen, __u16 type, const struct lu_fid *fid)
945 {
946         char    *name2  = lfsck_env_info(env)->lti_tmpbuf2;
947         int      len;
948         int      idx    = 0;
949
950         if (!S_ISDIR(type))
951                 return -ENOTDIR;
952
953         LASSERT(name != name2);
954
955         len = snprintf(name2, sizeof(lfsck_env_info(env)->lti_tmpbuf2),
956                        DFID":", PFID(fid));
957         if (namelen < len + 1 || memcmp(name, name2, len) != 0)
958                 return -EINVAL;
959
960         do {
961                 if (!isdigit(name[len]))
962                         return -EINVAL;
963
964                 idx = idx * 10 + name[len++] - '0';
965         } while (len < namelen);
966
967         if (idx >= LFSCK_LMV_MAX_STRIPES)
968                 return -EINVAL;
969
970         return idx;
971 }
972
973 static inline bool lfsck_name_hash_match(struct lmv_mds_md_v1 *lmv,
974                                          const char *name, int namelen)
975 {
976         int idx;
977
978         idx = lmv_name_to_stripe_index_old(lmv, name, namelen);
979         if (idx == lmv->lmv_master_mdt_index)
980                 return true;
981
982         if (!lmv_hash_is_layout_changing(lmv->lmv_hash_type))
983                 return false;
984
985         idx = lmv_name_to_stripe_index(lmv, name, namelen);
986         return (idx == lmv->lmv_master_mdt_index);
987 }
988
989 bool lfsck_is_valid_slave_name_entry(const struct lu_env *env,
990                                      struct lfsck_lmv *llmv,
991                                      const char *name, int namelen)
992 {
993         if (llmv == NULL || !llmv->ll_lmv_slave || !llmv->ll_lmv_verified)
994                 return true;
995
996         return lfsck_name_hash_match(&llmv->ll_lmv, name, namelen);
997 }
998
999 /**
1000  * Check whether the given name is a valid entry under the @parent.
1001  *
1002  * If the @parent is a striped directory then the @child should one
1003  * shard of the striped directory, its name should be $FID:$index.
1004  *
1005  * If the @parent is a shard of a striped directory, then the name hash
1006  * should match the MDT, otherwise it is invalid.
1007  *
1008  * \param[in] env       pointer to the thread context
1009  * \param[in] parent    the parent directory
1010  * \param[in] child     the child object to be checked
1011  * \param[in] cname     the name for the @child in the parent directory
1012  *
1013  * \retval              positive number for invalid name entry
1014  * \retval              0 if the name is valid or uncertain
1015  * \retval              negative error number on failure
1016  */
1017 int lfsck_namespace_check_name(const struct lu_env *env,
1018                                struct lfsck_instance *lfsck,
1019                                struct dt_object *parent,
1020                                struct dt_object *child,
1021                                const struct lu_name *cname)
1022 {
1023         struct lmv_mds_md_v1 *lmv = &lfsck_env_info(env)->lti_lmv;
1024         int rc;
1025
1026         rc = lfsck_read_stripe_lmv(env, lfsck, parent, lmv);
1027         if (rc != 0)
1028                 RETURN(rc == -ENODATA ? 0 : rc);
1029
1030         if (lmv->lmv_magic == LMV_MAGIC_STRIPE) {
1031                 if (!lfsck_is_valid_slave_lmv(lmv))
1032                         return 0;
1033
1034                 if (!lfsck_name_hash_match(lmv, cname->ln_name,
1035                                            cname->ln_namelen))
1036                         return 1;
1037         } else if (lfsck_shard_name_to_index(env, cname->ln_name,
1038                         cname->ln_namelen, lfsck_object_type(child),
1039                         lfsck_dto2fid(child)) < 0) {
1040                 return 1;
1041         }
1042
1043         return 0;
1044 }
1045
1046 /**
1047  * Update the object's LMV EA with the given @lmv.
1048  *
1049  * \param[in] env       pointer to the thread context
1050  * \param[in] com       pointer to the lfsck component
1051  * \param[in] obj       pointer to the object which LMV EA will be updated
1052  * \param[in] lmv       pointer to buffer holding the new LMV EA
1053  * \param[in] locked    whether the caller has held ldlm lock on the @obj or not
1054  *
1055  * \retval              positive number for nothing to be done
1056  * \retval              zero if updated successfully
1057  * \retval              negative error number on failure
1058  */
1059 int lfsck_namespace_update_lmv(const struct lu_env *env,
1060                                struct lfsck_component *com,
1061                                struct dt_object *obj,
1062                                struct lmv_mds_md_v1 *lmv, bool locked)
1063 {
1064         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1065         struct lmv_mds_md_v1            *lmv4   = &info->lti_lmv4;
1066         struct lu_buf                   *buf    = &info->lti_buf;
1067         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1068         struct dt_device                *dev    = lfsck_obj2dev(obj);
1069         struct thandle                  *th     = NULL;
1070         struct lustre_handle             lh     = { 0 };
1071         int                              rc     = 0;
1072         int                              rc1    = 0;
1073         ENTRY;
1074
1075         LASSERT(lmv4 != lmv);
1076
1077         lfsck_lmv_header_cpu_to_le(lmv4, lmv);
1078         lfsck_buf_init(buf, lmv4, sizeof(*lmv4));
1079
1080         if (!locked) {
1081                 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
1082                                       MDS_INODELOCK_UPDATE |
1083                                       MDS_INODELOCK_XATTR, LCK_EX);
1084                 if (rc != 0)
1085                         GOTO(log, rc);
1086         }
1087
1088         th = dt_trans_create(env, dev);
1089         if (IS_ERR(th))
1090                 GOTO(log, rc = PTR_ERR(th));
1091
1092         /* For remote updating LMV EA, there will be further LFSCK action on
1093          * remote MDT after the updating, so update the LMV EA synchronously. */
1094         if (dt_object_remote(obj))
1095                 th->th_sync = 1;
1096
1097         rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LMV, 0, th);
1098         if (rc != 0)
1099                 GOTO(stop, rc);
1100
1101         rc = dt_trans_start_local(env, dev, th);
1102         if (rc != 0)
1103                 GOTO(stop, rc);
1104
1105         dt_write_lock(env, obj, 0);
1106         if (unlikely(lfsck_is_dead_obj(obj)))
1107                 GOTO(unlock, rc = 1);
1108
1109         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1110                 GOTO(unlock, rc = 0);
1111
1112         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LMV, 0, th);
1113
1114         GOTO(unlock, rc);
1115
1116 unlock:
1117         dt_write_unlock(env, obj);
1118
1119 stop:
1120         rc1 = dt_trans_stop(env, dev, th);
1121         if (rc == 0)
1122                 rc = rc1;
1123
1124 log:
1125         lfsck_ibits_unlock(&lh, LCK_EX);
1126         CDEBUG(D_LFSCK, "%s: namespace LFSCK updated the %s LMV EA "
1127                "for the object "DFID": rc = %d\n",
1128                lfsck_lfsck2name(lfsck),
1129                lmv->lmv_magic == LMV_MAGIC ? "master" : "slave",
1130                PFID(lfsck_dto2fid(obj)), rc);
1131
1132         return rc;
1133 }
1134
1135 /**
1136  * Check whether allow to re-genereate the lost master LMV EA.
1137  *
1138  * If the master MDT-object of the striped directory lost its master LMV EA,
1139  * then before the LFSCK repaired the striped directory, some ones may have
1140  * created some objects (that are not normal shards of the striped directory)
1141  * under the master MDT-object. If such case happend, then the LFSCK cannot
1142  * re-generate the lost master LMV EA to keep those objects to be visible to
1143  * client.
1144  *
1145  * \param[in] env       pointer to the thread context
1146  * \param[in] com       pointer to the lfsck component
1147  * \param[in] obj       pointer to the master MDT-object to be checked
1148  * \param[in] cfid      the shard's FID used for verification
1149  * \param[in] cidx      the shard's index used for verification
1150  *
1151  * \retval              positive number if not allow to re-generate LMV EA
1152  * \retval              zero if allow to re-generate LMV EA
1153  * \retval              negative error number on failure
1154  */
1155 static int lfsck_allow_regenerate_master_lmv(const struct lu_env *env,
1156                                              struct lfsck_component *com,
1157                                              struct dt_object *obj,
1158                                              const struct lu_fid *cfid,
1159                                              __u32 cidx)
1160 {
1161         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1162         struct lu_fid                   *tfid   = &info->lti_fid3;
1163         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1164         struct lu_dirent                *ent    =
1165                         (struct lu_dirent *)info->lti_key;
1166         const struct dt_it_ops          *iops;
1167         struct dt_it                    *di;
1168         __u64                            cookie;
1169         __u32                            args;
1170         int                              rc;
1171         __u16                            type;
1172         ENTRY;
1173
1174         if (unlikely(!dt_try_as_dir(env, obj)))
1175                 RETURN(-ENOTDIR);
1176
1177         /* Check whether the shard and the master MDT-object matches or not. */
1178         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf), DFID":%u",
1179                  PFID(cfid), cidx);
1180         rc = dt_lookup(env, obj, (struct dt_rec *)tfid,
1181                        (const struct dt_key *)info->lti_tmpbuf);
1182         if (rc != 0)
1183                 RETURN(rc);
1184
1185         if (!lu_fid_eq(tfid, cfid))
1186                 RETURN(-ENOENT);
1187
1188         args = lfsck->li_args_dir & ~(LUDA_VERIFY | LUDA_VERIFY_DRYRUN);
1189         iops = &obj->do_index_ops->dio_it;
1190         di = iops->init(env, obj, args);
1191         if (IS_ERR(di))
1192                 RETURN(PTR_ERR(di));
1193
1194         rc = iops->load(env, di, 0);
1195         if (rc == 0)
1196                 rc = iops->next(env, di);
1197         else if (rc > 0)
1198                 rc = 0;
1199
1200         if (rc != 0)
1201                 GOTO(out, rc);
1202
1203         do {
1204                 rc = iops->rec(env, di, (struct dt_rec *)ent, args);
1205                 if (rc == 0)
1206                         rc = lfsck_unpack_ent(ent, &cookie, &type);
1207
1208                 if (rc != 0)
1209                         GOTO(out, rc);
1210
1211                 /* skip dot and dotdot entries */
1212                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1213                         goto next;
1214
1215                 /* If the subdir name does not match the shard name rule, then
1216                  * it is quite possible that it is NOT a shard, but created by
1217                  * someone after the master MDT-object lost the master LMV EA.
1218                  * But it is also possible that the subdir name entry crashed,
1219                  * under such double failure cases, the LFSCK cannot know how
1220                  * to repair the inconsistency. For data safe, the LFSCK will
1221                  * mark the master MDT-object as read-only. The administrator
1222                  * can fix the bad shard name manually, then run LFSCK again.
1223                  *
1224                  * XXX: If the subdir name matches the shard name rule, but it
1225                  *      is not a real shard of the striped directory, instead,
1226                  *      it was created by someone after the master MDT-object
1227                  *      lost the LMV EA, then re-generating the master LMV EA
1228                  *      will cause such subdir to be invisible to client, and
1229                  *      if its index occupies some lost shard index, then the
1230                  *      LFSCK will use it to replace the bad shard, and cause
1231                  *      the subdir (itself) to be invisible for ever. */
1232                 if (lfsck_shard_name_to_index(env, ent->lde_name,
1233                                 ent->lde_namelen, type, &ent->lde_fid) < 0)
1234                         GOTO(out, rc = 1);
1235
1236 next:
1237                 rc = iops->next(env, di);
1238         } while (rc == 0);
1239
1240         GOTO(out, rc = 0);
1241
1242 out:
1243         iops->put(env, di);
1244         iops->fini(env, di);
1245
1246         return rc;
1247 }
1248
1249 /**
1250  * Notify remote LFSCK instance that the object's LMV EA has been updated.
1251  *
1252  * \param[in] env       pointer to the thread context
1253  * \param[in] com       pointer to the lfsck component
1254  * \param[in] obj       pointer to the object on which the LMV EA will be set
1255  * \param[in] event     indicate either master or slave LMV EA has been updated
1256  * \param[in] flags     indicate which element(s) in the LMV EA has been updated
1257  * \param[in] index     the MDT index on which the LFSCK instance to be notified
1258  *
1259  * \retval              positive number if nothing to be done
1260  * \retval              zero for success
1261  * \retval              negative error number on failure
1262  */
1263 static int lfsck_namespace_notify_lmv_remote(const struct lu_env *env,
1264                                              struct lfsck_component *com,
1265                                              struct dt_object *obj,
1266                                              __u32 event, __u32 flags,
1267                                              __u32 index)
1268 {
1269         struct lfsck_request            *lr     = &lfsck_env_info(env)->lti_lr;
1270         const struct lu_fid             *fid    = lfsck_dto2fid(obj);
1271         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1272         struct lfsck_tgt_desc           *ltd    = NULL;
1273         struct ptlrpc_request           *req    = NULL;
1274         int                              rc;
1275         ENTRY;
1276
1277         ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, index);
1278         if (ltd == NULL)
1279                 GOTO(out, rc = -ENODEV);
1280
1281         req = ptlrpc_request_alloc(class_exp2cliimp(ltd->ltd_exp),
1282                                    &RQF_LFSCK_NOTIFY);
1283         if (req == NULL)
1284                 GOTO(out, rc = -ENOMEM);
1285
1286         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
1287         if (rc != 0) {
1288                 ptlrpc_request_free(req);
1289
1290                 GOTO(out, rc);
1291         }
1292
1293         lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1294         memset(lr, 0, sizeof(*lr));
1295         lr->lr_event = event;
1296         lr->lr_index = lfsck_dev_idx(lfsck);
1297         lr->lr_active = LFSCK_TYPE_NAMESPACE;
1298         lr->lr_fid = *fid;
1299         lr->lr_flags = flags;
1300
1301         ptlrpc_request_set_replen(req);
1302         rc = ptlrpc_queue_wait(req);
1303         ptlrpc_req_finished(req);
1304
1305         GOTO(out, rc = (rc == -ENOENT ? 1 : rc));
1306
1307 out:
1308         CDEBUG(D_LFSCK, "%s: namespace LFSCK notify LMV EA updated for the "
1309                "object "DFID" on MDT %x remotely with event %u, flags %u: "
1310                "rc = %d\n", lfsck_lfsck2name(lfsck), PFID(fid), index,
1311                event, flags, rc);
1312
1313         if (ltd != NULL)
1314                 lfsck_tgt_put(ltd);
1315
1316         return rc;
1317 }
1318
1319 /**
1320  * Generate request for local LFSCK instance to rescan the striped directory.
1321  *
1322  * \param[in] env       pointer to the thread context
1323  * \param[in] com       pointer to the lfsck component
1324  * \param[in] obj       pointer to the striped directory to be rescanned
1325  *
1326  * \retval              positive number if nothing to be done
1327  * \retval              zero for success
1328  * \retval              negative error number on failure
1329  */
1330 int lfsck_namespace_notify_lmv_master_local(const struct lu_env *env,
1331                                             struct lfsck_component *com,
1332                                             struct dt_object *obj)
1333 {
1334         struct lfsck_instance      *lfsck = com->lc_lfsck;
1335         struct lfsck_namespace     *ns    = com->lc_file_ram;
1336         struct lmv_mds_md_v1       *lmv4  = &lfsck_env_info(env)->lti_lmv4;
1337         struct lfsck_lmv_unit      *llu;
1338         struct lfsck_lmv           *llmv;
1339         struct lfsck_slave_lmv_rec *lslr;
1340         int                         count = 0;
1341         int                         rc;
1342         ENTRY;
1343
1344         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1345                 RETURN(0);
1346
1347         rc = lfsck_read_stripe_lmv(env, lfsck, obj, lmv4);
1348         if (rc != 0)
1349                 RETURN(rc);
1350
1351         OBD_ALLOC_PTR(llu);
1352         if (unlikely(llu == NULL))
1353                 RETURN(-ENOMEM);
1354
1355         if (lmv4->lmv_stripe_count < 1)
1356                 count = LFSCK_LMV_DEF_STRIPES;
1357         else if (lmv4->lmv_stripe_count > LFSCK_LMV_MAX_STRIPES)
1358                 count = LFSCK_LMV_MAX_STRIPES;
1359         else
1360                 count = lmv4->lmv_stripe_count;
1361
1362         OBD_ALLOC_PTR_ARRAY_LARGE(lslr, count);
1363         if (lslr == NULL) {
1364                 OBD_FREE_PTR(llu);
1365
1366                 RETURN(-ENOMEM);
1367         }
1368
1369         INIT_LIST_HEAD(&llu->llu_link);
1370         llu->llu_lfsck = lfsck;
1371         llu->llu_obj = lfsck_object_get(obj);
1372         llmv = &llu->llu_lmv;
1373         llmv->ll_lmv_master = 1;
1374         llmv->ll_inline = 1;
1375         atomic_set(&llmv->ll_ref, 1);
1376         llmv->ll_stripes_allocated = count;
1377         llmv->ll_hash_type = LMV_HASH_TYPE_UNKNOWN;
1378         llmv->ll_lslr = lslr;
1379         llmv->ll_lmv = *lmv4;
1380
1381         down_write(&com->lc_sem);
1382         if (ns->ln_status != LS_SCANNING_PHASE1 &&
1383             ns->ln_status != LS_SCANNING_PHASE2) {
1384                 ns->ln_striped_dirs_skipped++;
1385                 up_write(&com->lc_sem);
1386                 lfsck_lmv_put(env, llmv);
1387         } else {
1388                 ns->ln_striped_dirs_repaired++;
1389                 llmv->ll_counted = 1;
1390                 spin_lock(&lfsck->li_lock);
1391                 list_add_tail(&llu->llu_link, &lfsck->li_list_lmv);
1392                 spin_unlock(&lfsck->li_lock);
1393                 up_write(&com->lc_sem);
1394         }
1395
1396         RETURN(0);
1397 }
1398
1399 /**
1400  * Set master LMV EA for the specified striped directory.
1401  *
1402  * First, if the master MDT-object of a striped directory lost its LMV EA,
1403  * then there may be some users have created some files under the master
1404  * MDT-object directly. Under such case, the LFSCK cannot re-generate LMV
1405  * EA for the master MDT-object, because we should keep the existing files
1406  * to be visible to client. Then the LFSCK will mark the striped directory
1407  * as read-only and keep it there to be handled by administrator manually.
1408  *
1409  * If nobody has created files under the master MDT-object of the striped
1410  * directory, then we will set the master LMV EA and generate a new rescan
1411  * (the striped directory) request that will be handled later by the LFSCK
1412  * instance on the MDT later.
1413  *
1414  * \param[in] env       pointer to the thread context
1415  * \param[in] com       pointer to the lfsck component
1416  * \param[in] obj       pointer to the object on which the LMV EA will be set
1417  * \param[in] lmv       pointer to the buffer holding the new LMV EA
1418  * \param[in] cfid      the shard's FID used for verification
1419  * \param[in] cidx      the shard's index used for verification
1420  * \param[in] flags     to indicate which element(s) in the LMV EA will be set
1421  *
1422  * \retval              positive number if nothing to be done
1423  * \retval              zero for success
1424  * \retval              negative error number on failure
1425  */
1426 static int lfsck_namespace_set_lmv_master(const struct lu_env *env,
1427                                           struct lfsck_component *com,
1428                                           struct dt_object *obj,
1429                                           struct lmv_mds_md_v1 *lmv,
1430                                           const struct lu_fid *cfid,
1431                                           __u32 cidx, __u32 flags)
1432 {
1433         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1434         struct lmv_mds_md_v1            *lmv3   = &info->lti_lmv3;
1435         struct lu_seq_range             *range  = &info->lti_range;
1436         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1437         struct seq_server_site          *ss     = lfsck_dev_site(lfsck);
1438         struct lustre_handle             lh     = { 0 };
1439         int                              pidx   = -1;
1440         int                              rc     = 0;
1441         ENTRY;
1442
1443         fld_range_set_mdt(range);
1444         rc = fld_server_lookup(env, ss->ss_server_fld,
1445                                fid_seq(lfsck_dto2fid(obj)), range);
1446         if (rc != 0)
1447                 GOTO(log, rc);
1448
1449         pidx = range->lsr_index;
1450         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
1451                               MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1452                               LCK_EX);
1453         if (rc != 0)
1454                 GOTO(log, rc);
1455
1456         rc = lfsck_read_stripe_lmv(env, lfsck, obj, lmv3);
1457         if (rc == -ENODATA) {
1458                 if (!(flags & LEF_SET_LMV_ALL))
1459                         GOTO(log, rc);
1460
1461                 *lmv3 = *lmv;
1462         } else if (rc == 0) {
1463                 if (flags & LEF_SET_LMV_ALL)
1464                         GOTO(log, rc = 1);
1465
1466                 if (flags & LEF_SET_LMV_HASH)
1467                         lmv3->lmv_hash_type = lmv->lmv_hash_type;
1468         } else {
1469                 GOTO(log, rc);
1470         }
1471
1472         lmv3->lmv_magic = LMV_MAGIC;
1473         lmv3->lmv_master_mdt_index = pidx;
1474         lmv3->lmv_layout_version++;
1475
1476         if (flags & LEF_SET_LMV_ALL) {
1477                 rc = lfsck_allow_regenerate_master_lmv(env, com, obj,
1478                                                        cfid, cidx);
1479                 if (rc > 0) {
1480                         rc = lfsck_disable_master_lmv(env, com, obj, false);
1481
1482                         GOTO(log, rc = (rc == 0 ? 1 : rc));
1483                 }
1484
1485                 if (rc < 0)
1486                         GOTO(log, rc);
1487
1488                 /* To indicate that the master has ever lost LMV EA. */
1489                 lmv3->lmv_hash_type |= LMV_HASH_FLAG_LOST_LMV;
1490         }
1491
1492         rc = lfsck_namespace_update_lmv(env, com, obj, lmv3, true);
1493         if (rc == 0 && flags & LEF_SET_LMV_ALL) {
1494                 if (dt_object_remote(obj))
1495                         rc = lfsck_namespace_notify_lmv_remote(env, com, obj,
1496                                                 LE_SET_LMV_MASTER, 0, pidx);
1497                 else
1498                         rc = lfsck_namespace_notify_lmv_master_local(env, com,
1499                                                                      obj);
1500         }
1501
1502         GOTO(log, rc);
1503
1504 log:
1505         lfsck_ibits_unlock(&lh, LCK_EX);
1506         CDEBUG(D_LFSCK, "%s: namespace LFSCK set master LMV EA for the object "
1507                DFID" on the %s MDT %d, flags %x: rc = %d\n",
1508                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)),
1509                dt_object_remote(obj) ? "remote" : "local", pidx, flags, rc);
1510
1511         if (rc <= 0) {
1512                 struct lfsck_namespace *ns = com->lc_file_ram;
1513
1514                 ns->ln_flags |= LF_INCONSISTENT;
1515         }
1516
1517         return rc;
1518 }
1519
1520 /**
1521  * Repair the bad name hash.
1522  *
1523  * If the name hash of some name entry under the striped directory does not
1524  * match the shard of the striped directory, then the LFSCK will repair the
1525  * inconsistency. Ideally, the LFSCK should migrate the name entry from the
1526  * current MDT to the right MDT (another one), but before the async commit
1527  * finished, the LFSCK will change the striped directory's hash type as
1528  * LMV_HASH_TYPE_UNKNOWN and mark the lmv flags as LMV_HASH_FLAG_BAD_TYPE.
1529  *
1530  * \param[in] env       pointer to the thread context
1531  * \param[in] com       pointer to the lfsck component
1532  * \param[in] shard     pointer to the shard of the striped directory that
1533  *                      contains the bad name entry
1534  * \param[in] llmv      pointer to lfsck LMV EA structure
1535  * \param[in] name      the name of the bad name hash
1536  *
1537  * \retval              positive number if nothing to be done
1538  * \retval              zero for success
1539  * \retval              negative error number on failure
1540  */
1541 int lfsck_namespace_repair_bad_name_hash(const struct lu_env *env,
1542                                          struct lfsck_component *com,
1543                                          struct dt_object *shard,
1544                                          struct lfsck_lmv *llmv,
1545                                          const char *name)
1546 {
1547         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1548         struct lu_fid                   *pfid   = &info->lti_fid3;
1549         struct lmv_mds_md_v1            *lmv2   = &info->lti_lmv2;
1550         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1551         struct dt_object                *parent = NULL;
1552         int                              rc     = 0;
1553         ENTRY;
1554
1555         rc = dt_lookup(env, shard, (struct dt_rec *)pfid,
1556                        (const struct dt_key *)dotdot);
1557         if (rc != 0 || !fid_is_sane(pfid))
1558                 GOTO(log, rc);
1559
1560         parent = lfsck_object_find_bottom(env, lfsck, pfid);
1561         if (IS_ERR(parent))
1562                 GOTO(log, rc = PTR_ERR(parent));
1563
1564         if (unlikely(!dt_object_exists(parent)))
1565                 /* The parent object was previously accessed when verifying
1566                  * the slave LMV EA.  If this condition is true it is because
1567                  * the striped directory is being removed. */
1568                 GOTO(log, rc = 1);
1569
1570         *lmv2 = llmv->ll_lmv;
1571         lmv2->lmv_hash_type = LMV_HASH_TYPE_UNKNOWN | LMV_HASH_FLAG_BAD_TYPE;
1572         rc = lfsck_namespace_set_lmv_master(env, com, parent, lmv2,
1573                                             lfsck_dto2fid(shard),
1574                                             llmv->ll_lmv.lmv_master_mdt_index,
1575                                             LEF_SET_LMV_HASH);
1576
1577         GOTO(log, rc);
1578
1579 log:
1580         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found bad name hash "
1581                "on the MDT %x, parent "DFID", name %s, shard_%x "DFID
1582                ": rc = %d\n",
1583                lfsck_lfsck2name(lfsck), lfsck_dev_idx(lfsck),
1584                PFID(pfid), name, llmv->ll_lmv.lmv_master_mdt_index,
1585                PFID(lfsck_dto2fid(shard)), rc);
1586
1587         if (parent != NULL && !IS_ERR(parent))
1588                 lfsck_object_put(env, parent);
1589
1590         return rc;
1591 }
1592
1593 /**
1594  * Scan the shard of a striped directory for name hash verification.
1595  *
1596  * During the first-stage scanning, if the LFSCK cannot make sure whether
1597  * the shard of a stripe directory contains valid slave LMV EA or not, then
1598  * it will skip the name hash verification for this shard temporarily, and
1599  * record the shard's FID in the LFSCK tracing file. As the LFSCK processing,
1600  * the slave LMV EA may has been verified/fixed by LFSCK instance on master.
1601  * Then in the second-stage scanning, the shard will be re-scanned, and for
1602  * every name entry under the shard, the name hash will be verified, and for
1603  * unmatched name entry, the LFSCK will try to fix it.
1604  *
1605  * \param[in] env       pointer to the thread context
1606  * \param[in] com       pointer to the lfsck component
1607  * \param[in] child     pointer to the directory object to be handled
1608  *
1609  * \retval              positive number for scanning successfully
1610  * \retval              zero for the scanning is paused
1611  * \retval              negative error number on failure
1612  */
1613 int lfsck_namespace_scan_shard(const struct lu_env *env,
1614                                struct lfsck_component *com,
1615                                struct dt_object *child)
1616 {
1617         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1618         struct lmv_mds_md_v1            *lmv    = &info->lti_lmv;
1619         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1620         struct lfsck_namespace          *ns     = com->lc_file_ram;
1621         struct ptlrpc_thread            *thread = &lfsck->li_thread;
1622         struct lu_dirent                *ent    =
1623                         (struct lu_dirent *)info->lti_key;
1624         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
1625         struct lfsck_lmv                *llmv   = NULL;
1626         const struct dt_it_ops          *iops;
1627         struct dt_it                    *di;
1628         __u64                            cookie;
1629         __u32                            args;
1630         int                              rc;
1631         __u16                            type;
1632         ENTRY;
1633
1634         rc = lfsck_read_stripe_lmv(env, lfsck, child, lmv);
1635         if (rc != 0)
1636                 RETURN(rc == -ENODATA ? 1 : rc);
1637
1638         if (lmv->lmv_magic != LMV_MAGIC_STRIPE)
1639                 RETURN(1);
1640
1641         if (unlikely(!dt_try_as_dir(env, child)))
1642                 RETURN(-ENOTDIR);
1643
1644         OBD_ALLOC_PTR(llmv);
1645         if (llmv == NULL)
1646                 RETURN(-ENOMEM);
1647
1648         llmv->ll_lmv_slave = 1;
1649         llmv->ll_lmv_verified = 1;
1650         llmv->ll_lmv = *lmv;
1651         atomic_set(&llmv->ll_ref, 1);
1652
1653         args = lfsck->li_args_dir & ~(LUDA_VERIFY | LUDA_VERIFY_DRYRUN);
1654         iops = &child->do_index_ops->dio_it;
1655         di = iops->init(env, child, args);
1656         if (IS_ERR(di))
1657                 GOTO(out, rc = PTR_ERR(di));
1658
1659         rc = iops->load(env, di, 0);
1660         if (rc == 0)
1661                 rc = iops->next(env, di);
1662         else if (rc > 0)
1663                 rc = 0;
1664
1665         while (rc == 0) {
1666                 if (CFS_FAIL_TIMEOUT(OBD_FAIL_LFSCK_DELAY3, cfs_fail_val) &&
1667                     unlikely(!thread_is_running(thread)))
1668                         GOTO(out, rc = 0);
1669
1670                 rc = iops->rec(env, di, (struct dt_rec *)ent, args);
1671                 if (rc == 0)
1672                         rc = lfsck_unpack_ent(ent, &cookie, &type);
1673
1674                 if (rc != 0) {
1675                         if (bk->lb_param & LPF_FAILOUT)
1676                                 GOTO(out, rc);
1677
1678                         goto next;
1679                 }
1680
1681                 /* skip dot and dotdot entries */
1682                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1683                         goto next;
1684
1685                 if (!lfsck_is_valid_slave_name_entry(env, llmv, ent->lde_name,
1686                                                      ent->lde_namelen)) {
1687                         ns->ln_flags |= LF_INCONSISTENT;
1688                         rc = lfsck_namespace_repair_bad_name_hash(env, com,
1689                                                 child, llmv, ent->lde_name);
1690                         if (rc == 0)
1691                                 ns->ln_name_hash_repaired++;
1692                 }
1693
1694                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
1695                         GOTO(out, rc);
1696
1697                 /* Rate control. */
1698                 lfsck_control_speed(lfsck);
1699                 if (unlikely(!thread_is_running(thread)))
1700                         GOTO(out, rc = 0);
1701
1702                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
1703                         spin_lock(&lfsck->li_lock);
1704                         thread_set_flags(thread, SVC_STOPPING);
1705                         spin_unlock(&lfsck->li_lock);
1706
1707                         GOTO(out, rc = -EINVAL);
1708                 }
1709
1710 next:
1711                 rc = iops->next(env, di);
1712         }
1713
1714         GOTO(out, rc);
1715
1716 out:
1717         iops->put(env, di);
1718         iops->fini(env, di);
1719         lfsck_lmv_put(env, llmv);
1720
1721         return rc;
1722 }
1723
1724 /**
1725  * Verify the slave object's (of striped directory) LMV EA.
1726  *
1727  * For the slave object of a striped directory, before traversing the shard
1728  * the LFSCK will verify whether its slave LMV EA matches its parent's master
1729  * LMV EA or not.
1730  *
1731  * \param[in] env       pointer to the thread context
1732  * \param[in] com       pointer to the lfsck component
1733  * \param[in] obj       pointer to the object which LMV EA will be checked
1734  * \param[in] llmv      pointer to buffer holding the slave LMV EA
1735  *
1736  * \retval              positive number if nothing to be done
1737  * \retval              zero for success
1738  * \retval              negative error number on failure
1739  */
1740 int lfsck_namespace_verify_stripe_slave(const struct lu_env *env,
1741                                         struct lfsck_component *com,
1742                                         struct dt_object *obj,
1743                                         struct lfsck_lmv *llmv)
1744 {
1745         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1746         char                            *name   = info->lti_key;
1747         char                            *name2;
1748         struct lu_fid                   *pfid   = &info->lti_fid3;
1749         const struct lu_fid             *cfid   = lfsck_dto2fid(obj);
1750         struct lu_fid                    tfid;
1751         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1752         struct lmv_mds_md_v1            *clmv   = &llmv->ll_lmv;
1753         struct lmv_mds_md_v1            *plmv   = &info->lti_lmv;
1754         struct dt_object                *parent = NULL;
1755         int                              rc     = 0;
1756         ENTRY;
1757
1758         if (!lfsck_is_valid_slave_lmv(clmv)) {
1759                 rc = lfsck_namespace_trace_update(env, com, cfid,
1760                                         LNTF_UNCERTAIN_LMV, true);
1761
1762                 GOTO(out, rc);
1763         }
1764
1765         rc = dt_lookup(env, obj, (struct dt_rec *)pfid,
1766                        (const struct dt_key *)dotdot);
1767         if (rc != 0 || !fid_is_sane(pfid)) {
1768                 rc = lfsck_namespace_trace_update(env, com, cfid,
1769                                         LNTF_UNCERTAIN_LMV, true);
1770
1771                 GOTO(out, rc);
1772         }
1773
1774         CFS_FAIL_TIMEOUT(OBD_FAIL_LFSCK_ENGINE_DELAY, cfs_fail_val);
1775
1776         parent = lfsck_object_find_bottom(env, lfsck, pfid);
1777         if (IS_ERR(parent)) {
1778                 rc = lfsck_namespace_trace_update(env, com, cfid,
1779                                         LNTF_UNCERTAIN_LMV, true);
1780
1781                 GOTO(out, rc);
1782         }
1783
1784         if (unlikely(!dt_object_exists(parent)))
1785                 GOTO(out, rc = 1);
1786
1787         if (unlikely(!dt_try_as_dir(env, parent)))
1788                 GOTO(out, rc = -ENOTDIR);
1789
1790         rc = lfsck_read_stripe_lmv(env, lfsck, parent, plmv);
1791         if (rc != 0) {
1792                 int rc1;
1793
1794                 /* If the parent has no LMV EA, then it maybe because:
1795                  * 1) The parent lost the LMV EA.
1796                  * 2) The child claims a wrong (slave) LMV EA. */
1797                 if (rc == -ENODATA)
1798                         rc = lfsck_namespace_set_lmv_master(env, com, parent,
1799                                         clmv, cfid, clmv->lmv_master_mdt_index,
1800                                         LEF_SET_LMV_ALL);
1801                 else
1802                         rc = 0;
1803
1804                 rc1 = lfsck_namespace_trace_update(env, com, cfid,
1805                                                    LNTF_UNCERTAIN_LMV, true);
1806
1807                 GOTO(out, rc = (rc < 0 ? rc : rc1));
1808         }
1809
1810         /* Unmatched magic or stripe count. */
1811         if (unlikely(plmv->lmv_magic != LMV_MAGIC ||
1812                      plmv->lmv_stripe_count != clmv->lmv_stripe_count)) {
1813                 rc = lfsck_namespace_trace_update(env, com, cfid,
1814                                                   LNTF_UNCERTAIN_LMV, true);
1815
1816                 GOTO(out, rc);
1817         }
1818
1819         /* If the master hash type has been set as LMV_HASH_TYPE_UNKNOWN,
1820          * then the slave hash type is not important. */
1821         if ((plmv->lmv_hash_type & LMV_HASH_TYPE_MASK) ==
1822             LMV_HASH_TYPE_UNKNOWN &&
1823             plmv->lmv_hash_type & LMV_HASH_FLAG_BAD_TYPE)
1824                 GOTO(out, rc = 0);
1825
1826         /* Unmatched hash type. */
1827         if (unlikely((plmv->lmv_hash_type & LMV_HASH_TYPE_MASK) !=
1828                      (clmv->lmv_hash_type & LMV_HASH_TYPE_MASK))) {
1829                 rc = lfsck_namespace_trace_update(env, com, cfid,
1830                                                   LNTF_UNCERTAIN_LMV, true);
1831
1832                 GOTO(out, rc);
1833         }
1834
1835         snprintf(info->lti_tmpbuf2, sizeof(info->lti_tmpbuf2), DFID":%u",
1836                  PFID(cfid), clmv->lmv_master_mdt_index);
1837         name2 = info->lti_tmpbuf2;
1838
1839         rc = lfsck_links_get_first(env, obj, name, &tfid);
1840         if (rc == 0 && strcmp(name, name2) == 0 && lu_fid_eq(pfid, &tfid)) {
1841                 llmv->ll_lmv_verified = 1;
1842
1843                 GOTO(out, rc);
1844         }
1845
1846         rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
1847                        (const struct dt_key *)name2);
1848         if (rc != 0 || !lu_fid_eq(cfid, &tfid))
1849                 rc = lfsck_namespace_trace_update(env, com, cfid,
1850                                                   LNTF_UNCERTAIN_LMV, true);
1851         else
1852                 llmv->ll_lmv_verified = 1;
1853
1854         GOTO(out, rc);
1855
1856 out:
1857         if (parent != NULL && !IS_ERR(parent))
1858                 lfsck_object_put(env, parent);
1859
1860         return rc;
1861 }
1862
1863 /**
1864  * Double scan the striped directory or the shard.
1865  *
1866  * All the shards' under the given striped directory or its shard have
1867  * been scanned, the LFSCK has got the global knownledge about the LMV
1868  * EA consistency.
1869  *
1870  * If the target is one shard of a striped directory, then only needs to
1871  * update related tracing file.
1872  *
1873  * If the target is the master MDT-object of a striped directory, then the
1874  * LFSCK will make the decision about whether the master LMV EA is invalid
1875  * or not, and repair it if inconsistenct; for every shard of the striped
1876  * directory, whether the slave LMV EA is invalid or not, and repair it if
1877  * inconsistent.
1878  *
1879  * \param[in] env       pointer to the thread context
1880  * \param[in] com       pointer to the lfsck component
1881  * \param[in] lnr       pointer to the namespace request that contains the
1882  *                      striped directory or the shard
1883  *
1884  * \retval              zero for success
1885  * \retval              negative error number on failure
1886  */
1887 int lfsck_namespace_striped_dir_rescan(const struct lu_env *env,
1888                                        struct lfsck_component *com,
1889                                        struct lfsck_namespace_req *lnr)
1890 {
1891         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1892         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1893         struct lfsck_namespace          *ns     = com->lc_file_ram;
1894         struct lfsck_lmv                *llmv   = lnr->lnr_lmv;
1895         struct lmv_mds_md_v1            *lmv    = &llmv->ll_lmv;
1896         struct lmv_mds_md_v1            *lmv2   = &info->lti_lmv2;
1897         struct lfsck_assistant_object   *lso    = lnr->lnr_lar.lar_parent;
1898         const struct lu_fid             *pfid   = &lso->lso_fid;
1899         struct dt_object                *dir    = NULL;
1900         struct dt_object                *obj    = NULL;
1901         struct lu_seq_range             *range  = &info->lti_range;
1902         struct seq_server_site          *ss     = lfsck_dev_site(lfsck);
1903         __u32                            stripe_count;
1904         __u32                            hash_type;
1905         int                              rc     = 0;
1906         int                              i;
1907         ENTRY;
1908
1909         if (llmv->ll_lmv_slave) {
1910                 if (llmv->ll_lmv_verified) {
1911                         ns->ln_striped_shards_scanned++;
1912                         lfsck_namespace_trace_update(env, com, pfid,
1913                                         LNTF_UNCERTAIN_LMV |
1914                                         LNTF_RECHECK_NAME_HASH, false);
1915                 }
1916
1917                 RETURN(0);
1918         }
1919
1920         /* Either the striped directory has been disabled or only part of
1921          * the striped directory have been scanned. The LFSCK cannot repair
1922          * something based on incompleted knowledge. So skip it. */
1923         if (llmv->ll_ignore || llmv->ll_exit_value <= 0)
1924                 RETURN(0);
1925
1926         /* There ever been some failure, as to the LFSCK cannot know whether
1927          * it has got the global knowledge about the LMV EA consistency or not,
1928          * so it cannot make reparation about the incompleted knowledge. */
1929         if (llmv->ll_failed) {
1930                 ns->ln_striped_dirs_scanned++;
1931                 ns->ln_striped_dirs_failed++;
1932
1933                 RETURN(0);
1934         }
1935
1936         if (lmv->lmv_stripe_count > LFSCK_LMV_MAX_STRIPES)
1937                 stripe_count = max(llmv->ll_max_filled_off + 1,
1938                                    llmv->ll_max_stripe_count);
1939         else
1940                 stripe_count = max(llmv->ll_max_filled_off + 1,
1941                                    lmv->lmv_stripe_count);
1942
1943         if (lmv->lmv_stripe_count != stripe_count) {
1944                 lmv->lmv_stripe_count = stripe_count;
1945                 llmv->ll_lmv_updated = 1;
1946         }
1947
1948         if (!lmv_is_known_hash_type(lmv->lmv_hash_type) &&
1949             !(lmv->lmv_hash_type & LMV_HASH_FLAG_BAD_TYPE) &&
1950             lmv_is_known_hash_type(llmv->ll_hash_type)) {
1951                 hash_type = llmv->ll_hash_type & LMV_HASH_TYPE_MASK;
1952                 lmv->lmv_hash_type = llmv->ll_hash_type;
1953                 llmv->ll_lmv_updated = 1;
1954         } else {
1955                 hash_type = lmv->lmv_hash_type & LMV_HASH_TYPE_MASK;
1956                 if (!lmv_is_known_hash_type(hash_type))
1957                         hash_type = LMV_HASH_TYPE_UNKNOWN;
1958         }
1959
1960         if (llmv->ll_lmv_updated) {
1961                 if (dir == NULL) {
1962                         dir = lfsck_assistant_object_load(env, lfsck, lso);
1963                         if (IS_ERR(dir)) {
1964                                 rc = PTR_ERR(dir);
1965
1966                                 RETURN(rc == -ENOENT ? 0 : rc);
1967                         }
1968                 }
1969
1970                 lmv->lmv_layout_version++;
1971                 rc = lfsck_namespace_update_lmv(env, com, dir, lmv, false);
1972                 if (rc != 0)
1973                         RETURN(rc);
1974
1975                 ns->ln_striped_dirs_scanned++;
1976                 if (!llmv->ll_counted)
1977                         ns->ln_striped_dirs_repaired++;
1978         }
1979
1980         fld_range_set_mdt(range);
1981         for (i = 0; i <= llmv->ll_max_filled_off; i++) {
1982                 struct lfsck_slave_lmv_rec *lslr = llmv->ll_lslr + i;
1983                 const struct lu_fid *cfid = &lslr->lslr_fid;
1984                 const struct lu_name *cname;
1985                 struct linkea_data ldata = { NULL };
1986                 int rc1 = 0;
1987                 bool repair_linkea = false;
1988                 bool repair_lmvea = false;
1989                 bool rename = false;
1990                 bool create = false;
1991                 bool linkea_repaired = false;
1992                 bool lmvea_repaired = false;
1993                 bool rename_repaired = false;
1994                 bool create_repaired = false;
1995
1996                 /* LMV EA hole. */
1997                 if (fid_is_zero(cfid))
1998                         continue;
1999
2000                 lnr->lnr_fid = *cfid;
2001                 lnr->lnr_namelen = snprintf(lnr->lnr_name,
2002                                             lnr->lnr_size - sizeof(*lnr),
2003                                             DFID":%u", PFID(cfid), i);
2004                 cname = lfsck_name_get_const(env, lnr->lnr_name,
2005                                              lnr->lnr_namelen);
2006                 obj = lfsck_object_find_bottom(env, lfsck, cfid);
2007                 if (IS_ERR(obj)) {
2008                         if (dir == NULL) {
2009                                 dir = lfsck_assistant_object_load(env, lfsck,
2010                                                                   lso);
2011                                 if (IS_ERR(dir)) {
2012                                         if (PTR_ERR(dir) == -ENOENT)
2013                                                 RETURN(0);
2014
2015                                         dir = NULL;
2016                                 }
2017                         } else if (lfsck_is_dead_obj(dir)) {
2018                                 GOTO(out, rc = 0);
2019                         }
2020
2021                         rc1 = PTR_ERR(obj);
2022                         goto next;
2023                 }
2024
2025                 switch (lslr->lslr_flags) {
2026                 case LSLF_NONE:
2027                         if (llmv->ll_inline ||
2028                             lslr->lslr_stripe_count != stripe_count ||
2029                             (lslr->lslr_hash_type & LMV_HASH_TYPE_MASK) !=
2030                              hash_type)
2031                                 repair_lmvea = true;
2032                         break;
2033                 case LSLF_BAD_INDEX2:
2034                         /* The index in the slave LMV EA is right,
2035                          * the name entry should be updated. */
2036                         rename = true;
2037                         snprintf(info->lti_tmpbuf2, sizeof(info->lti_tmpbuf2),
2038                                  DFID":%u", PFID(cfid), lslr->lslr_index);
2039                         if (llmv->ll_inline ||
2040                             lslr->lslr_stripe_count != stripe_count ||
2041                             (lslr->lslr_hash_type & LMV_HASH_TYPE_MASK) !=
2042                              hash_type)
2043                                 repair_lmvea = true;
2044                         break;
2045                 case LSLF_BAD_INDEX1:
2046                         /* The index in the name entry is right,
2047                          * the slave LMV EA should be updated. */
2048                 case LSLF_NO_LMVEA:
2049                         repair_lmvea = true;
2050                         break;
2051                 case LSLF_DANGLING:
2052                         create = true;
2053                         goto repair;
2054                 default:
2055                         break;
2056                 }
2057
2058                 rc1 = lfsck_links_read_with_rec(env, obj, &ldata);
2059                 if (rc1 == -ENOENT) {
2060                         create = true;
2061                         goto repair;
2062                 }
2063
2064                 if (rc1 == -EINVAL || rc1 == -ENODATA) {
2065                         repair_linkea = true;
2066                         goto repair;
2067                 }
2068
2069                 if (rc1 != 0)
2070                         goto next;
2071
2072                 if (ldata.ld_leh->leh_reccount != 1) {
2073                         repair_linkea = true;
2074                         goto repair;
2075                 }
2076
2077                 rc1 = linkea_links_find(&ldata, cname, pfid);
2078                 if (rc1 != 0)
2079                         repair_linkea = true;
2080
2081 repair:
2082                 if (create) {
2083                         if (dir == NULL) {
2084                                 dir = lfsck_assistant_object_load(env, lfsck,
2085                                                                   lso);
2086                                 if (IS_ERR(dir)) {
2087                                         rc1 = PTR_ERR(dir);
2088
2089                                         if (rc1 == -ENOENT)
2090                                                 GOTO(out, rc = 0);
2091
2092                                         dir = NULL;
2093                                         goto next;
2094                                 }
2095                         }
2096
2097                         rc1 = lfsck_namespace_repair_dangling(env, com, dir,
2098                                                               obj, lnr);
2099                         if (rc1 >= 0) {
2100                                 create_repaired = true;
2101                                 if (rc == 0)
2102                                         ns->ln_dangling_repaired++;
2103                         }
2104                 }
2105
2106                 if (repair_lmvea) {
2107                         *lmv2 = *lmv;
2108                         lmv2->lmv_magic = LMV_MAGIC_STRIPE;
2109                         lmv2->lmv_stripe_count = stripe_count;
2110                         lmv2->lmv_master_mdt_index = i;
2111                         lmv2->lmv_hash_type = hash_type;
2112
2113                         rc1 = lfsck_namespace_update_lmv(env, com, obj,
2114                                                          lmv2, false);
2115                         if (rc1 < 0)
2116                                 goto next;
2117
2118                         if (dt_object_remote(obj)) {
2119                                 rc1 = fld_server_lookup(env, ss->ss_server_fld,
2120                                         fid_seq(lfsck_dto2fid(obj)), range);
2121                                 if (rc1 != 0)
2122                                         goto next;
2123
2124                                 rc1 = lfsck_namespace_notify_lmv_remote(env,
2125                                                 com, obj, LE_SET_LMV_SLAVE, 0,
2126                                                 range->lsr_index);
2127                         } else {
2128                                 ns->ln_striped_shards_repaired++;
2129                                 rc1 = lfsck_namespace_trace_update(env, com,
2130                                         cfid, LNTF_RECHECK_NAME_HASH, true);
2131                         }
2132
2133                         if (rc1 < 0)
2134                                 goto next;
2135
2136                         if (rc1 >= 0)
2137                                 lmvea_repaired = true;
2138                 } else if (llmv->ll_inline) {
2139                         if (dt_object_remote(obj)) {
2140                                 rc1 = fld_server_lookup(env, ss->ss_server_fld,
2141                                         fid_seq(lfsck_dto2fid(obj)), range);
2142                                 if (rc1 != 0)
2143                                         goto next;
2144
2145                                 /* The slave LMV EA on the remote shard is
2146                                  * correct, just notify the LFSCK instance
2147                                  * on such MDT to re-verify the name_hash. */
2148                                 rc1 = lfsck_namespace_notify_lmv_remote(env,
2149                                                 com, obj, LE_SET_LMV_SLAVE,
2150                                                 LEF_RECHECK_NAME_HASH,
2151                                                 range->lsr_index);
2152                         } else {
2153                                 rc1 = lfsck_namespace_trace_update(env, com,
2154                                         cfid, LNTF_RECHECK_NAME_HASH, true);
2155                         }
2156
2157                         if (rc1 < 0)
2158                                 goto next;
2159                 }
2160
2161                 if (rename) {
2162                         if (dir == NULL) {
2163                                 dir = lfsck_assistant_object_load(env, lfsck,
2164                                                                   lso);
2165                                 if (IS_ERR(dir)) {
2166                                         rc1 = PTR_ERR(dir);
2167
2168                                         if (rc1 == -ENOENT)
2169                                                 GOTO(out, rc = 0);
2170
2171                                         dir = NULL;
2172                                         goto next;
2173                                 }
2174                         }
2175
2176                         rc1 = lfsck_namespace_repair_dirent(env, com, dir, obj,
2177                                         info->lti_tmpbuf2, lnr->lnr_name,
2178                                         lnr->lnr_type, true, false);
2179                         if (rc1 >= 0) {
2180                                 rename_repaired = true;
2181                                 if (rc1 > 0) {
2182                                         ns->ln_dirent_repaired++;
2183                                         rc1 = lfsck_namespace_trace_update(env,
2184                                                 com, cfid,
2185                                                 LNTF_RECHECK_NAME_HASH, true);
2186                                 }
2187                         }
2188
2189                         if (rc1 < 0)
2190                                 goto next;
2191                 }
2192
2193                 if (repair_linkea) {
2194                         struct lustre_handle lh = { 0 };
2195
2196                         if (dir == NULL) {
2197                                 dir = lfsck_assistant_object_load(env, lfsck,
2198                                                                   lso);
2199                                 if (IS_ERR(dir)) {
2200                                         rc1 = PTR_ERR(dir);
2201
2202                                         if (rc1 == -ENOENT)
2203                                                 GOTO(out, rc = 0);
2204
2205                                         dir = NULL;
2206                                         goto next;
2207                                 }
2208                         }
2209
2210                         rc1 = linkea_links_new(&ldata, &info->lti_big_buf,
2211                                                cname, lfsck_dto2fid(dir));
2212                         if (rc1 != 0)
2213                                 goto next;
2214
2215                         rc1 = lfsck_ibits_lock(env, lfsck, obj, &lh,
2216                                                MDS_INODELOCK_UPDATE |
2217                                                MDS_INODELOCK_XATTR, LCK_EX);
2218                         if (rc1 != 0)
2219                                 goto next;
2220
2221                         rc1 = lfsck_namespace_rebuild_linkea(env, com, obj,
2222                                                              &ldata);
2223                         lfsck_ibits_unlock(&lh, LCK_EX);
2224                         if (rc1 >= 0) {
2225                                 linkea_repaired = true;
2226                                 if (rc1 > 0)
2227                                         ns->ln_linkea_repaired++;
2228                         }
2229                 }
2230
2231 next:
2232                 if (create || rename || repair_linkea || repair_lmvea) {
2233                         CDEBUG(D_LFSCK, "%s: namespace LFSCK repair the shard "
2234                                "%d "DFID" of the striped directory "DFID" with "
2235                                "dangling %s/%s, rename %s/%s, llinkea %s/%s, "
2236                                "repair_lmvea %s/%s: rc = %d\n",
2237                                lfsck_lfsck2name(lfsck),
2238                                i, PFID(cfid), PFID(pfid),
2239                                create ? "yes" : "no",
2240                                create_repaired ? "yes" : "no",
2241                                rename ? "yes" : "no",
2242                                rename_repaired ? "yes" : "no",
2243                                repair_linkea ? "yes" : "no",
2244                                linkea_repaired ? "yes" : "no",
2245                                repair_lmvea ? "yes" : "no",
2246                                lmvea_repaired ? "yes" : "no", rc1);
2247                 }
2248
2249                 if (obj != NULL && !IS_ERR(obj)) {
2250                         lfsck_object_put(env, obj);
2251                         obj = NULL;
2252                 }
2253
2254                 if (rc1 < 0) {
2255                         rc = rc1;
2256                         ns->ln_striped_shards_failed++;
2257                 }
2258         }
2259
2260         GOTO(out, rc);
2261
2262 out:
2263         if (obj != NULL && !IS_ERR(obj))
2264                 lfsck_object_put(env, obj);
2265
2266         if (dir != NULL && !IS_ERR(dir))
2267                 lfsck_object_put(env, dir);
2268
2269         return rc;
2270 }
2271
2272 /**
2273  * Verify the shard's name entry under the striped directory.
2274  *
2275  * Before all shards of the striped directory scanned, the LFSCK cannot
2276  * know whether the master LMV EA is valid or not, and also cannot know
2277  * how to repair an invalid shard exactly. For example, the stripe index
2278  * stored in the shard's name does not match the stripe index stored in
2279  * the slave LMV EA, then the LFSCK cannot know which one is correct.
2280  * If the LFSCK just assumed one is correct, and fixed the other, then
2281  * as the LFSCK processing, it may find that the former reparation is
2282  * wrong and have to roll back. Unfortunately, if some applications saw
2283  * the changes and made further modification based on such changes, then
2284  * the roll back is almost impossible.
2285  *
2286  * To avoid above trouble, the LFSCK will scan the master object of the
2287  * striped directory twice, that is NOT the same as normal two-stages
2288  * scanning, the double scanning the striped directory will happen both
2289  * during the first-stage scanning:
2290  *
2291  * 1) When the striped directory is opened for scanning, the LFSCK will
2292  *    iterate each shard in turn, and records its slave LMV EA in the
2293  *    lfsck_lmv::ll_lslr. In this step, if the 'shard' (may be fake
2294  *    shard) name does not match the shard naming rule, for example, it
2295  *    does not contains the shard's FID, or not contains index, then we
2296  *    can remove the bad name entry directly. But if the name is valid,
2297  *    but the shard has no slave LMV EA or the slave LMV EA does not
2298  *    match its name, then we just record related information in the
2299  *    lfsck_lmv::ll_lslr in RAM.
2300  *
2301  * 2) When all the known shards have been scanned, then the engine will
2302  *    generate a dummy request (via lfsck_namespace_close_dir) to tell
2303  *    the assistant thread that all the known shards have been scanned.
2304  *    Since the assistant has got the global knowledge about the index
2305  *    conflict, stripe count, hash type, and so on. Then the assistant
2306  *    thread will scan the lfsck_lmv::ll_lslr, and for every shard in
2307  *    the record, check and repair inconsistency.
2308  *
2309  * Generally, the stripe directory has only several shards, and there
2310  * will NOT be a lof of striped directory. So double scanning striped
2311  * directory will not much affect the LFSCK performance.
2312  *
2313  * \param[in] env       pointer to the thread context
2314  * \param[in] com       pointer to the lfsck component
2315  * \param[in] lnr       pointer to the namespace request that contains the
2316  *                      shard's name, parent object, parent's LMV, and ect.
2317  *
2318  * \retval              zero for success
2319  * \retval              negative error number on failure
2320  */
2321 int lfsck_namespace_handle_striped_master(const struct lu_env *env,
2322                                           struct lfsck_component *com,
2323                                           struct lfsck_namespace_req *lnr)
2324 {
2325         struct lfsck_thread_info   *info        = lfsck_env_info(env);
2326         struct lmv_mds_md_v1       *lmv         = &info->lti_lmv;
2327         struct lfsck_instance      *lfsck       = com->lc_lfsck;
2328         struct lfsck_namespace     *ns          = com->lc_file_ram;
2329         struct lfsck_lmv           *llmv        = lnr->lnr_lmv;
2330         struct lfsck_assistant_object *lso      = lnr->lnr_lar.lar_parent;
2331         const struct lu_fid        *pfid        = &lso->lso_fid;
2332         struct dt_object           *dir;
2333         struct dt_object           *obj         = NULL;
2334         struct dt_device           *dev         = NULL;
2335         int                         shard_idx   = 0;
2336         int                         stripe      = 0;
2337         int                         rc          = 0;
2338         int                         depth       = 0;
2339         bool                        repaired    = false;
2340         enum lfsck_namespace_inconsistency_type type = LNIT_NONE;
2341         ENTRY;
2342
2343         if (unlikely(llmv->ll_ignore))
2344                 RETURN(0);
2345
2346         dir = lfsck_assistant_object_load(env, lfsck, lso);
2347         if (IS_ERR(dir)) {
2348                 rc = PTR_ERR(dir);
2349
2350                 RETURN(rc == -ENOENT ? 0 : rc);
2351         }
2352
2353         shard_idx = lfsck_find_mdt_idx_by_fid(env, lfsck, &lnr->lnr_fid);
2354         if (shard_idx < 0)
2355                 GOTO(fail_lmv, rc = shard_idx);
2356
2357         if (shard_idx == lfsck_dev_idx(lfsck)) {
2358                 if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0))
2359                         GOTO(out, rc = 0);
2360
2361                 dev = lfsck->li_bottom;
2362         } else {
2363                 struct lfsck_tgt_desc *ltd;
2364
2365                 /* Usually, some local filesystem consistency verification
2366                  * tools can guarantee the local namespace tree consistenct.
2367                  * So the LFSCK will only verify the remote directory. */
2368                 if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0)) {
2369                         rc = lfsck_namespace_trace_update(env, com, pfid,
2370                                                 LNTF_CHECK_PARENT, true);
2371
2372                         GOTO(out, rc);
2373                 }
2374
2375                 ltd = lfsck_ltd2tgt(&lfsck->li_mdt_descs, shard_idx);
2376                 if (unlikely(ltd == NULL)) {
2377                         CDEBUG(D_LFSCK, "%s: cannot talk with MDT %x which "
2378                                "did not join the namespace LFSCK\n",
2379                                lfsck_lfsck2name(lfsck), shard_idx);
2380                         lfsck_lad_set_bitmap(env, com, shard_idx);
2381
2382                         GOTO(fail_lmv, rc = -ENODEV);
2383                 }
2384
2385                 dev = ltd->ltd_tgt;
2386         }
2387
2388         obj = lfsck_object_find_by_dev(env, dev, &lnr->lnr_fid);
2389         if (IS_ERR(obj)) {
2390                 if (lfsck_is_dead_obj(dir))
2391                         RETURN(0);
2392
2393                 GOTO(fail_lmv, rc = PTR_ERR(obj));
2394         }
2395
2396         if (!dt_object_exists(obj)) {
2397                 stripe = lfsck_shard_name_to_index(env, lnr->lnr_name,
2398                                 lnr->lnr_namelen, lnr->lnr_type, &lnr->lnr_fid);
2399                 if (stripe < 0) {
2400                         type = LNIT_BAD_DIRENT;
2401
2402                         GOTO(out, rc = 0);
2403                 }
2404
2405 dangling:
2406                 rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
2407                 if (rc == 0) {
2408                         memset(lmv, 0, sizeof(*lmv));
2409                         lmv->lmv_magic = LMV_MAGIC;
2410                         rc = lfsck_record_lmv(env, com, dir, lnr, lmv, stripe,
2411                                               LSLF_DANGLING, LSLF_NONE, &depth);
2412                 }
2413
2414                 GOTO(out, rc);
2415         }
2416
2417         stripe = lfsck_shard_name_to_index(env, lnr->lnr_name, lnr->lnr_namelen,
2418                                            lfsck_object_type(obj),
2419                                            &lnr->lnr_fid);
2420         if (stripe < 0) {
2421                 type = LNIT_BAD_DIRENT;
2422
2423                 GOTO(out, rc = 0);
2424         }
2425
2426         rc = lfsck_read_stripe_lmv(env, lfsck, obj, lmv);
2427         if (unlikely(rc == -ENOENT))
2428                 /* It may happen when the remote object has been removed,
2429                  * but the local MDT does not aware of that. */
2430                 goto dangling;
2431
2432         if (rc == -ENODATA)
2433                 rc = lfsck_record_lmv(env, com, dir, lnr, lmv, stripe,
2434                                       LSLF_NO_LMVEA, LSLF_NONE, &depth);
2435         else if (rc == 0)
2436                 rc = lfsck_record_lmv(env, com, dir, lnr, lmv, stripe,
2437                                       lmv->lmv_master_mdt_index != stripe ?
2438                                       LSLF_BAD_INDEX1 : LSLF_NONE, LSLF_NONE,
2439                                       &depth);
2440
2441         GOTO(out, rc);
2442
2443 fail_lmv:
2444         llmv->ll_failed = 1;
2445
2446 out:
2447         if (rc >= 0 && type == LNIT_NONE && !S_ISDIR(lnr->lnr_type))
2448                 type = LNIT_BAD_TYPE;
2449
2450         switch (type) {
2451         case LNIT_BAD_TYPE:
2452                 rc = lfsck_namespace_repair_dirent(env, com, dir, obj,
2453                                                    lnr->lnr_name, lnr->lnr_name,
2454                                                    lnr->lnr_type, true, false);
2455                 if (rc > 0)
2456                         repaired = true;
2457                 break;
2458         case LNIT_BAD_DIRENT:
2459                 rc = lfsck_namespace_repair_dirent(env, com, dir, obj,
2460                                                    lnr->lnr_name, lnr->lnr_name,
2461                                                    lnr->lnr_type, false, false);
2462                 if (rc > 0)
2463                         repaired = true;
2464                 break;
2465         default:
2466                 break;
2467         }
2468
2469         if (rc < 0) {
2470                 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle "
2471                        "the shard: "DFID", parent "DFID", name %.*s: rc = %d\n",
2472                        lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid),
2473                        PFID(pfid), lnr->lnr_namelen, lnr->lnr_name, rc);
2474
2475                 if ((rc == -ENOTCONN || rc == -ESHUTDOWN || rc == -EREMCHG ||
2476                      rc == -ETIMEDOUT || rc == -EHOSTDOWN ||
2477                      rc == -EHOSTUNREACH || rc == -EINPROGRESS) &&
2478                     dev != NULL && dev != lfsck->li_bottom)
2479                         lfsck_lad_set_bitmap(env, com, shard_idx);
2480
2481                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT))
2482                         rc = 0;
2483         } else {
2484                 if (repaired) {
2485                         ns->ln_items_repaired++;
2486
2487                         switch (type) {
2488                         case LNIT_BAD_TYPE:
2489                                 ns->ln_bad_type_repaired++;
2490                                 break;
2491                         case LNIT_BAD_DIRENT:
2492                                 ns->ln_dirent_repaired++;
2493                                 break;
2494                         default:
2495                                 break;
2496                         }
2497                 }
2498
2499                 rc = 0;
2500         }
2501
2502         if (obj != NULL && !IS_ERR(obj))
2503                 lfsck_object_put(env, obj);
2504
2505         lfsck_object_put(env, dir);
2506
2507         return rc;
2508 }