Whamcloud - gitweb
b5e5d80c5024ef80c0af7af1b26ae1bf62fcddea
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_namespace.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lustre/lustre_idl.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fid.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre/lustre_user.h>
41
42 #include "lfsck_internal.h"
43
44 #define LFSCK_NAMESPACE_MAGIC   0xA0629D03
45
46 enum lfsck_nameentry_check {
47         LFSCK_NAMEENTRY_DEAD            = 1, /* The object has been unlinked. */
48         LFSCK_NAMEENTRY_REMOVED         = 2, /* The entry has been removed. */
49         LFSCK_NAMEENTRY_RECREATED       = 3, /* The entry has been recreated. */
50 };
51
52 static const char lfsck_namespace_name[] = "lfsck_namespace";
53
54 static struct lfsck_namespace_req *
55 lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck,
56                                    struct lu_dirent *ent, __u16 type)
57 {
58         struct lfsck_namespace_req *lnr;
59         int                         size;
60
61         size = sizeof(*lnr) + (ent->lde_namelen & ~3) + 4;
62         OBD_ALLOC(lnr, size);
63         if (lnr == NULL)
64                 return ERR_PTR(-ENOMEM);
65
66         INIT_LIST_HEAD(&lnr->lnr_lar.lar_list);
67         lu_object_get(&lfsck->li_obj_dir->do_lu);
68         lnr->lnr_obj = lfsck->li_obj_dir;
69         lnr->lnr_fid = ent->lde_fid;
70         lnr->lnr_oit_cookie = lfsck->li_pos_current.lp_oit_cookie;
71         lnr->lnr_dir_cookie = ent->lde_hash;
72         lnr->lnr_attr = ent->lde_attrs;
73         lnr->lnr_size = size;
74         lnr->lnr_type = type;
75         lnr->lnr_namelen = ent->lde_namelen;
76         memcpy(lnr->lnr_name, ent->lde_name, ent->lde_namelen);
77
78         return lnr;
79 }
80
81 static void lfsck_namespace_assistant_req_fini(const struct lu_env *env,
82                                                struct lfsck_assistant_req *lar)
83 {
84         struct lfsck_namespace_req *lnr =
85                         container_of0(lar, struct lfsck_namespace_req, lnr_lar);
86
87         lu_object_put(env, &lnr->lnr_obj->do_lu);
88         OBD_FREE(lnr, lnr->lnr_size);
89 }
90
91 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
92                                       struct lfsck_namespace *src)
93 {
94         dst->ln_magic = le32_to_cpu(src->ln_magic);
95         dst->ln_status = le32_to_cpu(src->ln_status);
96         dst->ln_flags = le32_to_cpu(src->ln_flags);
97         dst->ln_success_count = le32_to_cpu(src->ln_success_count);
98         dst->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
99         dst->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
100         dst->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
101         dst->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
102         dst->ln_time_last_checkpoint =
103                                 le64_to_cpu(src->ln_time_last_checkpoint);
104         lfsck_position_le_to_cpu(&dst->ln_pos_latest_start,
105                                  &src->ln_pos_latest_start);
106         lfsck_position_le_to_cpu(&dst->ln_pos_last_checkpoint,
107                                  &src->ln_pos_last_checkpoint);
108         lfsck_position_le_to_cpu(&dst->ln_pos_first_inconsistent,
109                                  &src->ln_pos_first_inconsistent);
110         dst->ln_items_checked = le64_to_cpu(src->ln_items_checked);
111         dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
112         dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
113         dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
114         dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
115         dst->ln_objs_repaired_phase2 =
116                                 le64_to_cpu(src->ln_objs_repaired_phase2);
117         dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
118         dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
119         fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2,
120                       &src->ln_fid_latest_scanned_phase2);
121         dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
122         dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
123         dst->ln_mul_linked_checked = le64_to_cpu(src->ln_mul_linked_checked);
124         dst->ln_mul_linked_repaired = le64_to_cpu(src->ln_mul_linked_repaired);
125         dst->ln_unknown_inconsistency =
126                                 le64_to_cpu(src->ln_unknown_inconsistency);
127         dst->ln_unmatched_pairs_repaired =
128                                 le64_to_cpu(src->ln_unmatched_pairs_repaired);
129         dst->ln_dangling_repaired = le64_to_cpu(src->ln_dangling_repaired);
130         dst->ln_mul_ref_repaired = le64_to_cpu(src->ln_mul_ref_repaired);
131         dst->ln_bad_type_repaired = le64_to_cpu(src->ln_bad_type_repaired);
132         dst->ln_lost_dirent_repaired =
133                                 le64_to_cpu(src->ln_lost_dirent_repaired);
134 }
135
136 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
137                                       struct lfsck_namespace *src)
138 {
139         dst->ln_magic = cpu_to_le32(src->ln_magic);
140         dst->ln_status = cpu_to_le32(src->ln_status);
141         dst->ln_flags = cpu_to_le32(src->ln_flags);
142         dst->ln_success_count = cpu_to_le32(src->ln_success_count);
143         dst->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
144         dst->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
145         dst->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
146         dst->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
147         dst->ln_time_last_checkpoint =
148                                 cpu_to_le64(src->ln_time_last_checkpoint);
149         lfsck_position_cpu_to_le(&dst->ln_pos_latest_start,
150                                  &src->ln_pos_latest_start);
151         lfsck_position_cpu_to_le(&dst->ln_pos_last_checkpoint,
152                                  &src->ln_pos_last_checkpoint);
153         lfsck_position_cpu_to_le(&dst->ln_pos_first_inconsistent,
154                                  &src->ln_pos_first_inconsistent);
155         dst->ln_items_checked = cpu_to_le64(src->ln_items_checked);
156         dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
157         dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
158         dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
159         dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
160         dst->ln_objs_repaired_phase2 =
161                                 cpu_to_le64(src->ln_objs_repaired_phase2);
162         dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
163         dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
164         fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2,
165                       &src->ln_fid_latest_scanned_phase2);
166         dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
167         dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
168         dst->ln_mul_linked_checked = cpu_to_le64(src->ln_mul_linked_checked);
169         dst->ln_mul_linked_repaired = cpu_to_le64(src->ln_mul_linked_repaired);
170         dst->ln_unknown_inconsistency =
171                                 cpu_to_le64(src->ln_unknown_inconsistency);
172         dst->ln_unmatched_pairs_repaired =
173                                 cpu_to_le64(src->ln_unmatched_pairs_repaired);
174         dst->ln_dangling_repaired = cpu_to_le64(src->ln_dangling_repaired);
175         dst->ln_mul_ref_repaired = cpu_to_le64(src->ln_mul_ref_repaired);
176         dst->ln_bad_type_repaired = cpu_to_le64(src->ln_bad_type_repaired);
177         dst->ln_lost_dirent_repaired =
178                                 cpu_to_le64(src->ln_lost_dirent_repaired);
179 }
180
181 static void lfsck_namespace_record_failure(const struct lu_env *env,
182                                            struct lfsck_instance *lfsck,
183                                            struct lfsck_namespace *ns)
184 {
185         struct lfsck_position pos;
186
187         ns->ln_items_failed++;
188         lfsck_pos_fill(env, lfsck, &pos, false);
189         if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent) ||
190             lfsck_pos_is_eq(&pos, &ns->ln_pos_first_inconsistent) < 0) {
191                 ns->ln_pos_first_inconsistent = pos;
192
193                 CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
194                        "inconsistency at the pos ["LPU64", "DFID", "LPX64"]\n",
195                        lfsck_lfsck2name(lfsck),
196                        ns->ln_pos_first_inconsistent.lp_oit_cookie,
197                        PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
198                        ns->ln_pos_first_inconsistent.lp_dir_cookie);
199         }
200 }
201
202 /**
203  * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
204  * \retval 0: succeed.
205  * \retval -ve: failed cases.
206  */
207 static int lfsck_namespace_load(const struct lu_env *env,
208                                 struct lfsck_component *com)
209 {
210         int len = com->lc_file_size;
211         int rc;
212
213         rc = dt_xattr_get(env, com->lc_obj,
214                           lfsck_buf_get(env, com->lc_file_disk, len),
215                           XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
216         if (rc == len) {
217                 struct lfsck_namespace *ns = com->lc_file_ram;
218
219                 lfsck_namespace_le_to_cpu(ns,
220                                 (struct lfsck_namespace *)com->lc_file_disk);
221                 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
222                         CDEBUG(D_LFSCK, "%s: invalid lfsck_namespace magic "
223                                "%#x != %#x\n", lfsck_lfsck2name(com->lc_lfsck),
224                                ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
225                         rc = 1;
226                 } else {
227                         rc = 0;
228                 }
229         } else if (rc != -ENODATA) {
230                 CDEBUG(D_LFSCK, "%s: fail to load lfsck_namespace, "
231                        "expected = %d: rc = %d\n",
232                        lfsck_lfsck2name(com->lc_lfsck), len, rc);
233                 if (rc >= 0)
234                         rc = 1;
235         }
236         return rc;
237 }
238
239 static int lfsck_namespace_store(const struct lu_env *env,
240                                  struct lfsck_component *com, bool init)
241 {
242         struct dt_object        *obj    = com->lc_obj;
243         struct lfsck_instance   *lfsck  = com->lc_lfsck;
244         struct thandle          *handle;
245         int                      len    = com->lc_file_size;
246         int                      rc;
247         ENTRY;
248
249         lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
250                                   (struct lfsck_namespace *)com->lc_file_ram);
251         handle = dt_trans_create(env, lfsck->li_bottom);
252         if (IS_ERR(handle))
253                 GOTO(log, rc = PTR_ERR(handle));
254
255         rc = dt_declare_xattr_set(env, obj,
256                                   lfsck_buf_get(env, com->lc_file_disk, len),
257                                   XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
258         if (rc != 0)
259                 GOTO(out, rc);
260
261         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
262         if (rc != 0)
263                 GOTO(out, rc);
264
265         rc = dt_xattr_set(env, obj,
266                           lfsck_buf_get(env, com->lc_file_disk, len),
267                           XATTR_NAME_LFSCK_NAMESPACE,
268                           init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
269                           handle, BYPASS_CAPA);
270
271         GOTO(out, rc);
272
273 out:
274         dt_trans_stop(env, lfsck->li_bottom, handle);
275
276 log:
277         if (rc != 0)
278                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_namespace: rc = %d\n",
279                        lfsck_lfsck2name(lfsck), rc);
280         return rc;
281 }
282
283 static int lfsck_namespace_init(const struct lu_env *env,
284                                 struct lfsck_component *com)
285 {
286         struct lfsck_namespace *ns = com->lc_file_ram;
287         int rc;
288
289         memset(ns, 0, sizeof(*ns));
290         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
291         ns->ln_status = LS_INIT;
292         down_write(&com->lc_sem);
293         rc = lfsck_namespace_store(env, com, true);
294         up_write(&com->lc_sem);
295         return rc;
296 }
297
298 /**
299  * Update the namespace LFSCK tracing file for the given @fid
300  *
301  * \param[in] env       pointer to the thread context
302  * \param[in] com       pointer to the lfsck component
303  * \param[in] fid       the fid which flags to be updated in the lfsck
304  *                      tracing file
305  * \param[in] add       true if add new flags, otherwise remove flags
306  *
307  * \retval              0 for succeed or nothing to be done
308  * \retval              negative error number on failure
309  */
310 int lfsck_namespace_trace_update(const struct lu_env *env,
311                                  struct lfsck_component *com,
312                                  const struct lu_fid *fid,
313                                  const __u8 flags, bool add)
314 {
315         struct lfsck_instance   *lfsck  = com->lc_lfsck;
316         struct dt_object        *obj    = com->lc_obj;
317         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid3;
318         struct dt_device        *dev    = lfsck->li_bottom;
319         struct thandle          *th     = NULL;
320         int                      rc     = 0;
321         __u8                     old    = 0;
322         __u8                     new    = 0;
323         ENTRY;
324
325         LASSERT(flags != 0);
326
327         down_write(&com->lc_sem);
328         fid_cpu_to_be(key, fid);
329         rc = dt_lookup(env, obj, (struct dt_rec *)&old,
330                        (const struct dt_key *)key, BYPASS_CAPA);
331         if (rc == -ENOENT) {
332                 if (!add)
333                         GOTO(unlock, rc = 0);
334
335                 old = 0;
336                 new = flags;
337         } else if (rc == 0) {
338                 if (add) {
339                         if ((old & flags) == flags)
340                                 GOTO(unlock, rc = 0);
341
342                         new = old | flags;
343                 } else {
344                         if ((old & flags) == 0)
345                                 GOTO(unlock, rc = 0);
346
347                         new = old & ~flags;
348                 }
349         } else {
350                 GOTO(log, rc);
351         }
352
353         th = dt_trans_create(env, dev);
354         if (IS_ERR(th))
355                 GOTO(log, rc = PTR_ERR(th));
356
357         if (old != 0) {
358                 rc = dt_declare_delete(env, obj,
359                                        (const struct dt_key *)key, th);
360                 if (rc != 0)
361                         GOTO(log, rc);
362         }
363
364         if (new != 0) {
365                 rc = dt_declare_insert(env, obj,
366                                        (const struct dt_rec *)&new,
367                                        (const struct dt_key *)key, th);
368                 if (rc != 0)
369                         GOTO(log, rc);
370         }
371
372         rc = dt_trans_start_local(env, dev, th);
373         if (rc != 0)
374                 GOTO(log, rc);
375
376         if (old != 0) {
377                 rc = dt_delete(env, obj, (const struct dt_key *)key,
378                                th, BYPASS_CAPA);
379                 if (rc != 0)
380                         GOTO(log, rc);
381         }
382
383         if (new != 0) {
384                 rc = dt_insert(env, obj, (const struct dt_rec *)&new,
385                                (const struct dt_key *)key, th, BYPASS_CAPA, 1);
386                 if (rc != 0)
387                         GOTO(log, rc);
388         }
389
390         GOTO(log, rc);
391
392 log:
393         if (th != NULL && !IS_ERR(th))
394                 dt_trans_stop(env, dev, th);
395
396         CDEBUG(D_LFSCK, "%s: namespace LFSCK %s flags for "DFID" in the "
397                "tracing file, flags %x, old %x, new %x: rc = %d\n",
398                lfsck_lfsck2name(lfsck), add ? "add" : "del", PFID(fid),
399                (__u32)flags, (__u32)old, (__u32)new, rc);
400
401 unlock:
402         up_write(&com->lc_sem);
403
404         return rc;
405 }
406
407 static int lfsck_namespace_check_exist(const struct lu_env *env,
408                                        struct dt_object *dir,
409                                        struct dt_object *obj, const char *name)
410 {
411         struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
412         int               rc;
413         ENTRY;
414
415         if (unlikely(lfsck_is_dead_obj(obj)))
416                 RETURN(LFSCK_NAMEENTRY_DEAD);
417
418         rc = dt_lookup(env, dir, (struct dt_rec *)fid,
419                        (const struct dt_key *)name, BYPASS_CAPA);
420         if (rc == -ENOENT)
421                 RETURN(LFSCK_NAMEENTRY_REMOVED);
422
423         if (rc < 0)
424                 RETURN(rc);
425
426         if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
427                 RETURN(LFSCK_NAMEENTRY_RECREATED);
428
429         RETURN(0);
430 }
431
432 static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
433                                             struct dt_object *obj,
434                                             struct thandle *handle)
435 {
436         int rc;
437
438         /* For destroying all invalid linkEA entries. */
439         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
440         if (rc != 0)
441                 return rc;
442
443         /* For insert new linkEA entry. */
444         rc = dt_declare_xattr_set(env, obj,
445                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
446                         XATTR_NAME_LINK, 0, handle);
447         return rc;
448 }
449
450 int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
451                        struct linkea_data *ldata)
452 {
453         int rc;
454
455         if (ldata->ld_buf->lb_buf == NULL)
456                 return -ENOMEM;
457
458         if (!dt_object_exists(obj))
459                 return -ENOENT;
460
461         rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
462         if (rc == -ERANGE) {
463                 /* Buf was too small, figure out what we need. */
464                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LINK,
465                                   BYPASS_CAPA);
466                 if (rc <= 0)
467                         return rc;
468
469                 lu_buf_realloc(ldata->ld_buf, rc);
470                 if (ldata->ld_buf->lb_buf == NULL)
471                         return -ENOMEM;
472
473                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
474                                   BYPASS_CAPA);
475         }
476
477         if (rc > 0)
478                 rc = linkea_init(ldata);
479
480         return rc;
481 }
482
483 /**
484  * Remove linkEA for the given object.
485  *
486  * The caller should take the ldlm lock before the calling.
487  *
488  * \param[in] env       pointer to the thread context
489  * \param[in] com       pointer to the lfsck component
490  * \param[in] obj       pointer to the dt_object to be handled
491  *
492  * \retval              0 for repaired cases
493  * \retval              negative error number on failure
494  */
495 static int lfsck_namespace_links_remove(const struct lu_env *env,
496                                         struct lfsck_component *com,
497                                         struct dt_object *obj)
498 {
499         struct lfsck_instance           *lfsck  = com->lc_lfsck;
500         struct dt_device                *dev    = lfsck->li_bottom;
501         struct thandle                  *th     = NULL;
502         int                              rc     = 0;
503         ENTRY;
504
505         LASSERT(dt_object_remote(obj) == 0);
506
507         th = dt_trans_create(env, dev);
508         if (IS_ERR(th))
509                 GOTO(log, rc = PTR_ERR(th));
510
511         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, th);
512         if (rc != 0)
513                 GOTO(stop, rc);
514
515         rc = dt_trans_start_local(env, dev, th);
516         if (rc != 0)
517                 GOTO(stop, rc);
518
519         dt_write_lock(env, obj, 0);
520         if (unlikely(lfsck_is_dead_obj(obj)))
521                 GOTO(unlock, rc = -ENOENT);
522
523         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
524                 GOTO(unlock, rc = 0);
525
526         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th, BYPASS_CAPA);
527
528         GOTO(unlock, rc);
529
530 unlock:
531         dt_write_unlock(env, obj);
532
533 stop:
534         dt_trans_stop(env, dev, th);
535
536 log:
537         CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
538                "for the object "DFID": rc = %d\n",
539                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
540
541         if (rc == 0) {
542                 struct lfsck_namespace *ns = com->lc_file_ram;
543
544                 ns->ln_flags |= LF_INCONSISTENT;
545         }
546
547         return rc;
548 }
549
550 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
551                              struct linkea_data *ldata, struct thandle *handle)
552 {
553         const struct lu_buf *buf = lfsck_buf_get_const(env,
554                                                        ldata->ld_buf->lb_buf,
555                                                        ldata->ld_leh->leh_len);
556
557         return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle,
558                             BYPASS_CAPA);
559 }
560
561 static void lfsck_namespace_unpack_linkea_entry(struct linkea_data *ldata,
562                                                 struct lu_name *cname,
563                                                 struct lu_fid *pfid,
564                                                 char *buf)
565 {
566         linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
567         /* To guarantee the 'name' is terminated with '0'. */
568         memcpy(buf, cname->ln_name, cname->ln_namelen);
569         buf[cname->ln_namelen] = 0;
570         cname->ln_name = buf;
571 }
572
573 static int lfsck_namespace_filter_linkea_entry(struct linkea_data *ldata,
574                                                struct lu_name *cname,
575                                                struct lu_fid *pfid,
576                                                bool remove)
577 {
578         struct link_ea_entry    *oldlee;
579         int                      oldlen;
580         int                      repeated = 0;
581
582         oldlee = ldata->ld_lee;
583         oldlen = ldata->ld_reclen;
584         linkea_next_entry(ldata);
585         while (ldata->ld_lee != NULL) {
586                 ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
587                                    ldata->ld_lee->lee_reclen[1];
588                 if (unlikely(ldata->ld_reclen == oldlen &&
589                              memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
590                         repeated++;
591                         if (!remove)
592                                 break;
593
594                         linkea_del_buf(ldata, cname);
595                 } else {
596                         linkea_next_entry(ldata);
597                 }
598         }
599         ldata->ld_lee = oldlee;
600         ldata->ld_reclen = oldlen;
601
602         return repeated;
603 }
604
605 /**
606  * Insert orphan into .lustre/lost+found/MDTxxxx/ locally.
607  *
608  * Add the specified orphan MDT-object to the .lustre/lost+found/MDTxxxx/
609  * with the given type to generate the name, the detailed rules for name
610  * have been described as following.
611  *
612  * The function also generates the linkEA corresponding to the name entry
613  * under the .lustre/lost+found/MDTxxxx/ for the orphan MDT-object.
614  *
615  * \param[in] env       pointer to the thread context
616  * \param[in] com       pointer to the lfsck component
617  * \param[in] orphan    pointer to the orphan MDT-object
618  * \param[in] infix     additional information for the orphan name, such as
619  *                      the FID for original
620  * \param[in] type      the type for describing why the orphan MDT-object is
621  *                      created. The rules are as following:
622  *
623  *  type "D":           The MDT-object is a directory, it may knows its parent
624  *                      but because there is no valid linkEA, the LFSCK cannot
625  *                      know where to put it back to the namespace.
626  *  type "O":           The MDT-object has no linkEA, and there is no name
627  *                      entry that references the MDT-object.
628  *
629  * \see lfsck_layout_recreate_parent() for more types.
630  *
631  * The orphan name will be like:
632  * ${FID}-${infix}-${type}-${conflict_version}
633  *
634  * \param[out] count    if some others inserted some linkEA entries by race,
635  *                      then return the linkEA entries count.
636  *
637  * \retval              positive number for repaired cases
638  * \retval              0 if needs to repair nothing
639  * \retval              negative error number on failure
640  */
641 static int lfsck_namespace_insert_orphan(const struct lu_env *env,
642                                          struct lfsck_component *com,
643                                          struct dt_object *orphan,
644                                          const char *infix, const char *type,
645                                          int *count)
646 {
647         struct lfsck_thread_info        *info   = lfsck_env_info(env);
648         struct lu_name                  *cname  = &info->lti_name;
649         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
650         struct lu_fid                   *tfid   = &info->lti_fid5;
651         const struct lu_fid             *cfid   = lfsck_dto2fid(orphan);
652         const struct lu_fid             *pfid;
653         struct lfsck_instance           *lfsck  = com->lc_lfsck;
654         struct dt_device                *dev    = lfsck->li_bottom;
655         struct dt_object                *parent;
656         struct thandle                  *th     = NULL;
657         struct lustre_handle             plh    = { 0 };
658         struct lustre_handle             clh    = { 0 };
659         struct linkea_data               ldata  = { 0 };
660         struct lu_buf                    linkea_buf;
661         int                              namelen;
662         int                              idx    = 0;
663         int                              rc     = 0;
664         bool                             exist  = false;
665         ENTRY;
666
667         cname->ln_name = NULL;
668         /* Create .lustre/lost+found/MDTxxxx when needed. */
669         if (unlikely(lfsck->li_lpf_obj == NULL)) {
670                 rc = lfsck_create_lpf(env, lfsck);
671                 if (rc != 0)
672                         GOTO(log, rc);
673         }
674
675         parent = lfsck->li_lpf_obj;
676         pfid = lfsck_dto2fid(parent);
677
678         /* Hold update lock on the parent to prevent others to access. */
679         rc = lfsck_ibits_lock(env, lfsck, parent, &plh,
680                               MDS_INODELOCK_UPDATE, LCK_EX);
681         if (rc != 0)
682                 GOTO(log, rc);
683
684         do {
685                 namelen = snprintf(info->lti_key, NAME_MAX, DFID"%s-%s-%d",
686                                    PFID(cfid), infix, type, idx++);
687                 rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
688                                (const struct dt_key *)info->lti_key,
689                                BYPASS_CAPA);
690                 if (rc != 0 && rc != -ENOENT)
691                         GOTO(log, rc);
692
693                 if (unlikely(rc == 0 && lu_fid_eq(cfid, tfid)))
694                         exist = true;
695         } while (rc == 0 && !exist);
696
697         cname->ln_name = info->lti_key;
698         cname->ln_namelen = namelen;
699         rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
700         if (rc != 0)
701                 GOTO(log, rc);
702
703         rc = linkea_add_buf(&ldata, cname, pfid);
704         if (rc != 0)
705                 GOTO(log, rc);
706
707         rc = lfsck_ibits_lock(env, lfsck, orphan, &clh,
708                               MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP,
709                               LCK_EX);
710         if (rc != 0)
711                 GOTO(log, rc);
712
713         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
714                        ldata.ld_leh->leh_len);
715         th = dt_trans_create(env, dev);
716         if (IS_ERR(th))
717                 GOTO(log, rc = PTR_ERR(th));
718
719         if (S_ISDIR(lfsck_object_type(orphan))) {
720                 rc = dt_declare_delete(env, orphan,
721                                        (const struct dt_key *)dotdot, th);
722                 if (rc != 0)
723                         GOTO(stop, rc);
724
725                 rec->rec_type = S_IFDIR;
726                 rec->rec_fid = pfid;
727                 rc = dt_declare_insert(env, orphan, (const struct dt_rec *)rec,
728                                        (const struct dt_key *)dotdot, th);
729                 if (rc != 0)
730                         GOTO(stop, rc);
731         }
732
733         rc = dt_declare_xattr_set(env, orphan, &linkea_buf,
734                                   XATTR_NAME_LINK, 0, th);
735         if (rc != 0)
736                 GOTO(stop, rc);
737
738         if (!exist) {
739                 rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
740                 rec->rec_fid = cfid;
741                 rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
742                                        (const struct dt_key *)cname->ln_name,
743                                        th);
744                 if (rc != 0)
745                         GOTO(stop, rc);
746
747                 if (S_ISDIR(rec->rec_type)) {
748                         rc = dt_declare_ref_add(env, parent, th);
749                         if (rc != 0)
750                                 GOTO(stop, rc);
751                 }
752         }
753
754         rc = dt_trans_start_local(env, dev, th);
755         if (rc != 0)
756                 GOTO(stop, rc);
757
758         dt_write_lock(env, orphan, 0);
759         rc = lfsck_links_read(env, orphan, &ldata);
760         if (likely((rc == -ENODATA) || (rc == -EINVAL) ||
761                    (rc == 0 && ldata.ld_leh->leh_reccount == 0))) {
762                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
763                         GOTO(unlock, rc = 1);
764
765                 if (S_ISDIR(lfsck_object_type(orphan))) {
766                         rc = dt_delete(env, orphan,
767                                        (const struct dt_key *)dotdot, th,
768                                        BYPASS_CAPA);
769                         if (rc != 0)
770                                 GOTO(unlock, rc);
771
772                         rec->rec_type = S_IFDIR;
773                         rec->rec_fid = pfid;
774                         rc = dt_insert(env, orphan, (const struct dt_rec *)rec,
775                                        (const struct dt_key *)dotdot, th,
776                                        BYPASS_CAPA, 1);
777                         if (rc != 0)
778                                 GOTO(unlock, rc);
779                 }
780
781                 rc = dt_xattr_set(env, orphan, &linkea_buf, XATTR_NAME_LINK, 0,
782                                   th, BYPASS_CAPA);
783         } else {
784                 if (rc == 0 && count != NULL)
785                         *count = ldata.ld_leh->leh_reccount;
786
787                 GOTO(unlock, rc);
788         }
789         dt_write_unlock(env, orphan);
790
791         if (rc == 0 && !exist) {
792                 rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
793                 rec->rec_fid = cfid;
794                 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
795                                (const struct dt_key *)cname->ln_name,
796                                th, BYPASS_CAPA, 1);
797                 if (rc == 0 && S_ISDIR(rec->rec_type)) {
798                         dt_write_lock(env, parent, 0);
799                         rc = dt_ref_add(env, parent, th);
800                         dt_write_unlock(env, parent);
801                 }
802         }
803
804         GOTO(stop, rc = (rc == 0 ? 1 : rc));
805
806 unlock:
807         dt_write_unlock(env, orphan);
808
809 stop:
810         dt_trans_stop(env, dev, th);
811
812 log:
813         lfsck_ibits_unlock(&clh, LCK_EX);
814         lfsck_ibits_unlock(&plh, LCK_EX);
815         CDEBUG(D_LFSCK, "%s: namespace LFSCK insert orphan for the "
816                "object "DFID", name = %s: rc = %d\n",
817                lfsck_lfsck2name(lfsck), PFID(cfid),
818                cname->ln_name != NULL ? cname->ln_name : "<NULL>", rc);
819
820         if (rc != 0) {
821                 struct lfsck_namespace *ns = com->lc_file_ram;
822
823                 ns->ln_flags |= LF_INCONSISTENT;
824         }
825
826         return rc;
827 }
828
829 /**
830  * Add the specified name entry back to namespace.
831  *
832  * If there is a linkEA entry that back references a name entry under
833  * some parent directory, but such parent directory does not have the
834  * claimed name entry. On the other hand, the linkEA entries count is
835  * not larger than the MDT-object's hard link count. Under such case,
836  * it is quite possible that the name entry is lost. Then the LFSCK
837  * should add the name entry back to the namespace.
838  *
839  * \param[in] env       pointer to the thread context
840  * \param[in] com       pointer to the lfsck component
841  * \param[in] parent    pointer to the directory under which the name entry
842  *                      will be inserted into
843  * \param[in] child     pointer to the object referenced by the name entry
844  *                      that to be inserted into the parent
845  * \param[in] name      the name for the child in the parent directory
846  *
847  * \retval              positive number for repaired cases
848  * \retval              0 if nothing to be repaired
849  * \retval              negative error number on failure
850  */
851 static int lfsck_namespace_insert_normal(const struct lu_env *env,
852                                          struct lfsck_component *com,
853                                          struct dt_object *parent,
854                                          struct dt_object *child,
855                                          const char *name)
856 {
857         struct lfsck_thread_info        *info   = lfsck_env_info(env);
858         struct lu_attr                  *la     = &info->lti_la;
859         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
860         struct lfsck_instance           *lfsck  = com->lc_lfsck;
861         struct dt_device                *dev    = lfsck->li_next;
862         struct thandle                  *th     = NULL;
863         struct lustre_handle             lh     = { 0 };
864         int                              rc     = 0;
865         ENTRY;
866
867         if (unlikely(!dt_try_as_dir(env, parent)))
868                 GOTO(log, rc = -ENOTDIR);
869
870         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
871                 GOTO(log, rc = 1);
872
873         /* Hold update lock on the parent to prevent others to access. */
874         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
875                               MDS_INODELOCK_UPDATE, LCK_EX);
876         if (rc != 0)
877                 GOTO(log, rc);
878
879         th = dt_trans_create(env, dev);
880         if (IS_ERR(th))
881                 GOTO(unlock, rc = PTR_ERR(th));
882
883         rec->rec_type = lfsck_object_type(child) & S_IFMT;
884         rec->rec_fid = lfsck_dto2fid(child);
885         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
886                                (const struct dt_key *)name, th);
887         if (rc != 0)
888                 GOTO(stop, rc);
889
890         if (S_ISDIR(rec->rec_type)) {
891                 rc = dt_declare_ref_add(env, parent, th);
892                 if (rc != 0)
893                         GOTO(stop, rc);
894         }
895
896         memset(la, 0, sizeof(*la));
897         la->la_ctime = cfs_time_current_sec();
898         la->la_valid = LA_CTIME;
899         rc = dt_declare_attr_set(env, parent, la, th);
900         if (rc != 0)
901                 GOTO(stop, rc);
902
903         rc = dt_trans_start_local(env, dev, th);
904         if (rc != 0)
905                 GOTO(stop, rc);
906
907         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
908                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
909         if (rc != 0)
910                 GOTO(stop, rc);
911
912         if (S_ISDIR(rec->rec_type)) {
913                 dt_write_lock(env, parent, 0);
914                 rc = dt_ref_add(env, parent, th);
915                 dt_write_unlock(env, parent);
916                 if (rc != 0)
917                         GOTO(stop, rc);
918         }
919
920         la->la_ctime = cfs_time_current_sec();
921         rc = dt_attr_set(env, parent, la, th, BYPASS_CAPA);
922
923         GOTO(stop, rc = (rc == 0 ? 1 : rc));
924
925 stop:
926         dt_trans_stop(env, dev, th);
927
928 unlock:
929         lfsck_ibits_unlock(&lh, LCK_EX);
930
931 log:
932         CDEBUG(D_LFSCK, "%s: namespace LFSCK insert object "DFID" with "
933                "the name %s and type %o to the parent "DFID": rc = %d\n",
934                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), name,
935                lfsck_object_type(child) & S_IFMT,
936                PFID(lfsck_dto2fid(parent)), rc);
937
938         if (rc != 0) {
939                 struct lfsck_namespace *ns = com->lc_file_ram;
940
941                 ns->ln_flags |= LF_INCONSISTENT;
942                 if (rc > 0)
943                         ns->ln_lost_dirent_repaired++;
944         }
945
946         return rc;
947 }
948
949 /**
950  * Create the specified orphan MDT-object on remote MDT.
951  *
952  * The LFSCK instance on this MDT will send LFSCK RPC to remote MDT to
953  * ask the remote LFSCK instance to create the specified orphan object
954  * under .lustre/lost+found/MDTxxxx/ directory with the name:
955  * ${FID}-P-${conflict_version}.
956  *
957  * \param[in] env       pointer to the thread context
958  * \param[in] com       pointer to the lfsck component
959  * \param[in] orphan    pointer to the orphan MDT-object
960  * \param[in] type      the orphan's type to be created
961  *
962  *  type "P":           The orphan object to be created was a parent directory
963  *                      of some DMT-object which linkEA shows that the @orphan
964  *                      object is missing.
965  *
966  * \see lfsck_layout_recreate_parent() for more types.
967  *
968  * \retval              positive number for repaired cases
969  * \retval              0 if needs to repair nothing
970  * \retval              negative error number on failure
971  */
972 static int lfsck_namespace_create_orphan_remote(const struct lu_env *env,
973                                                 struct lfsck_component *com,
974                                                 struct dt_object *orphan,
975                                                 __u32 type)
976 {
977         struct lfsck_thread_info        *info   = lfsck_env_info(env);
978         struct lfsck_request            *lr     = &info->lti_lr;
979         struct lu_seq_range             *range  = &info->lti_range;
980         const struct lu_fid             *fid    = lfsck_dto2fid(orphan);
981         struct lfsck_namespace          *ns     = com->lc_file_ram;
982         struct lfsck_instance           *lfsck  = com->lc_lfsck;
983         struct seq_server_site          *ss     =
984                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
985         struct lfsck_tgt_desc           *ltd    = NULL;
986         struct ptlrpc_request           *req    = NULL;
987         int                              rc;
988         ENTRY;
989
990         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
991                 GOTO(out, rc = 1);
992
993         fld_range_set_mdt(range);
994         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
995         if (rc != 0)
996                 GOTO(out, rc);
997
998         ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, range->lsr_index);
999         if (ltd == NULL) {
1000                 ns->ln_flags |= LF_INCOMPLETE;
1001
1002                 GOTO(out, rc = -ENODEV);
1003         }
1004
1005         req = ptlrpc_request_alloc(class_exp2cliimp(ltd->ltd_exp),
1006                                    &RQF_LFSCK_NOTIFY);
1007         if (req == NULL)
1008                 GOTO(out, rc = -ENOMEM);
1009
1010         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
1011         if (rc != 0) {
1012                 ptlrpc_request_free(req);
1013
1014                 GOTO(out, rc);
1015         }
1016
1017         lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1018         memset(lr, 0, sizeof(*lr));
1019         lr->lr_event = LE_CREATE_ORPHAN;
1020         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1021         lr->lr_active = LFSCK_TYPE_NAMESPACE;
1022         lr->lr_fid = *fid;
1023         lr->lr_type = type;
1024
1025         ptlrpc_request_set_replen(req);
1026         rc = ptlrpc_queue_wait(req);
1027         ptlrpc_req_finished(req);
1028
1029         if (rc == 0)
1030                 rc = 1;
1031         else if (rc == -EEXIST)
1032                 rc = 0;
1033
1034         GOTO(out, rc);
1035
1036 out:
1037         CDEBUG(D_LFSCK, "%s: namespace LFSCK create object "
1038                DFID" on the MDT %x remotely: rc = %d\n",
1039                lfsck_lfsck2name(lfsck), PFID(fid),
1040                ltd != NULL ? ltd->ltd_index : -1, rc);
1041
1042         if (ltd != NULL)
1043                 lfsck_tgt_put(ltd);
1044
1045         return rc;
1046 }
1047
1048 /**
1049  * Create the specified orphan MDT-object locally.
1050  *
1051  * For the case that the parent MDT-object stored in some MDT-object's
1052  * linkEA entry is lost, the LFSCK will re-create the parent object as
1053  * an orphan and insert it into .lustre/lost+found/MDTxxxx/ directory
1054  * with the name ${FID}-P-${conflict_version}.
1055  *
1056  * \param[in] env       pointer to the thread context
1057  * \param[in] com       pointer to the lfsck component
1058  * \param[in] orphan    pointer to the orphan MDT-object to be created
1059  * \param[in] type      the orphan's type to be created
1060  *
1061  *  type "P":           The orphan object to be created was a parent directory
1062  *                      of some DMT-object which linkEA shows that the @orphan
1063  *                      object is missing.
1064  *
1065  * \see lfsck_layout_recreate_parent() for more types.
1066  *
1067  * \retval              positive number for repaired cases
1068  * \retval              negative error number on failure
1069  */
1070 static int lfsck_namespace_create_orphan_local(const struct lu_env *env,
1071                                                struct lfsck_component *com,
1072                                                struct dt_object *orphan,
1073                                                __u32 type)
1074 {
1075         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1076         struct lu_attr                  *la     = &info->lti_la;
1077         struct dt_allocation_hint       *hint   = &info->lti_hint;
1078         struct dt_object_format         *dof    = &info->lti_dof;
1079         struct lu_name                  *cname  = &info->lti_name2;
1080         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1081         struct lu_fid                   *tfid   = &info->lti_fid;
1082         const struct lu_fid             *cfid   = lfsck_dto2fid(orphan);
1083         const struct lu_fid             *pfid;
1084         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1085         struct dt_device                *dev    = lfsck->li_bottom;
1086         struct dt_object                *parent = NULL;
1087         struct dt_object                *child  = NULL;
1088         struct thandle                  *th     = NULL;
1089         struct lustre_handle             lh     = { 0 };
1090         struct linkea_data               ldata  = { 0 };
1091         struct lu_buf                    linkea_buf;
1092         char                             name[32];
1093         int                              namelen;
1094         int                              idx    = 0;
1095         int                              rc     = 0;
1096         ENTRY;
1097
1098         LASSERT(!dt_object_exists(orphan));
1099         LASSERT(!dt_object_remote(orphan));
1100
1101         /* @orphan maybe not attached to lfsck->li_bottom */
1102         child = lfsck_object_find_by_dev(env, dev, cfid);
1103         if (IS_ERR(child))
1104                 GOTO(log, rc = PTR_ERR(child));
1105
1106         cname->ln_name = NULL;
1107         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1108                 GOTO(log, rc = 1);
1109
1110         /* Create .lustre/lost+found/MDTxxxx when needed. */
1111         if (unlikely(lfsck->li_lpf_obj == NULL)) {
1112                 rc = lfsck_create_lpf(env, lfsck);
1113                 if (rc != 0)
1114                         GOTO(log, rc);
1115         }
1116
1117         parent = lfsck->li_lpf_obj;
1118         pfid = lfsck_dto2fid(parent);
1119
1120         /* Hold update lock on the parent to prevent others to access. */
1121         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
1122                               MDS_INODELOCK_UPDATE, LCK_EX);
1123         if (rc != 0)
1124                 GOTO(log, rc);
1125
1126         do {
1127                 namelen = snprintf(name, 31, DFID"-P-%d",
1128                                    PFID(cfid), idx++);
1129                 rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
1130                                (const struct dt_key *)name, BYPASS_CAPA);
1131                 if (rc != 0 && rc != -ENOENT)
1132                         GOTO(unlock1, rc);
1133         } while (rc == 0);
1134
1135         cname->ln_name = name;
1136         cname->ln_namelen = namelen;
1137
1138         memset(la, 0, sizeof(*la));
1139         la->la_mode = type | (S_ISDIR(type) ? 0700 : 0600);
1140         la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
1141                        LA_ATIME | LA_MTIME | LA_CTIME;
1142
1143         child->do_ops->do_ah_init(env, hint, parent, child,
1144                                   la->la_mode & S_IFMT);
1145
1146         memset(dof, 0, sizeof(*dof));
1147         dof->dof_type = dt_mode_to_dft(type);
1148
1149         rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
1150         if (rc != 0)
1151                 GOTO(unlock1, rc);
1152
1153         rc = linkea_add_buf(&ldata, cname, pfid);
1154         if (rc != 0)
1155                 GOTO(unlock1, rc);
1156
1157         th = dt_trans_create(env, dev);
1158         if (IS_ERR(th))
1159                 GOTO(unlock1, rc = PTR_ERR(th));
1160
1161         rc = dt_declare_create(env, child, la, hint, dof, th);
1162         if (rc == 0 && S_ISDIR(type))
1163                 rc = dt_declare_ref_add(env, child, th);
1164
1165         if (rc != 0)
1166                 GOTO(stop, rc);
1167
1168         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
1169                        ldata.ld_leh->leh_len);
1170         rc = dt_declare_xattr_set(env, child, &linkea_buf,
1171                                   XATTR_NAME_LINK, 0, th);
1172         if (rc != 0)
1173                 GOTO(stop, rc);
1174
1175         rec->rec_type = type;
1176         rec->rec_fid = cfid;
1177         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1178                                (const struct dt_key *)name, th);
1179         if (rc == 0 && S_ISDIR(type))
1180                 rc = dt_declare_ref_add(env, parent, th);
1181
1182         if (rc != 0)
1183                 GOTO(stop, rc);
1184
1185         rc = dt_trans_start_local(env, dev, th);
1186         if (rc != 0)
1187                 GOTO(stop, rc);
1188
1189         dt_write_lock(env, child, 0);
1190         rc = dt_create(env, child, la, hint, dof, th);
1191         if (rc != 0)
1192                 GOTO(unlock2, rc);
1193
1194         if (S_ISDIR(type)) {
1195                 if (unlikely(!dt_try_as_dir(env, child)))
1196                         GOTO(unlock2, rc = -ENOTDIR);
1197
1198                 rec->rec_type = S_IFDIR;
1199                 rec->rec_fid = cfid;
1200                 rc = dt_insert(env, child, (const struct dt_rec *)rec,
1201                                (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
1202                 if (rc != 0)
1203                         GOTO(unlock2, rc);
1204
1205                 rec->rec_fid = pfid;
1206                 rc = dt_insert(env, child, (const struct dt_rec *)rec,
1207                                (const struct dt_key *)dotdot, th,
1208                                BYPASS_CAPA, 1);
1209                 if (rc != 0)
1210                         GOTO(unlock2, rc);
1211
1212                 rc = dt_ref_add(env, child, th);
1213                 if (rc != 0)
1214                         GOTO(unlock2, rc);
1215         }
1216
1217         rc = dt_xattr_set(env, child, &linkea_buf,
1218                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1219         dt_write_unlock(env, child);
1220         if (rc != 0)
1221                 GOTO(stop, rc);
1222
1223         rec->rec_type = type;
1224         rec->rec_fid = cfid;
1225         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1226                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
1227         if (rc == 0 && S_ISDIR(type)) {
1228                 dt_write_lock(env, parent, 0);
1229                 rc = dt_ref_add(env, parent, th);
1230                 dt_write_unlock(env, parent);
1231         }
1232
1233         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1234
1235 unlock2:
1236         dt_write_unlock(env, child);
1237
1238 stop:
1239         dt_trans_stop(env, dev, th);
1240
1241 unlock1:
1242         lfsck_ibits_unlock(&lh, LCK_EX);
1243
1244 log:
1245         CDEBUG(D_LFSCK, "%s: namespace LFSCK create orphan locally for "
1246                "the object "DFID", name = %s, type %o: rc = %d\n",
1247                lfsck_lfsck2name(lfsck), PFID(cfid),
1248                cname->ln_name != NULL ? cname->ln_name : "<NULL>", type, rc);
1249
1250         if (child != NULL && !IS_ERR(child))
1251                 lfsck_object_put(env, child);
1252
1253         return rc;
1254 }
1255
1256 /**
1257  * Create the specified orphan MDT-object.
1258  *
1259  * For the case that the parent MDT-object stored in some MDT-object's
1260  * linkEA entry is lost, the LFSCK will re-create the parent object as
1261  * an orphan and insert it into .lustre/lost+found/MDTxxxx/ directory
1262  * with the name: ${FID}-P-${conflict_version}.
1263  *
1264  * \param[in] env       pointer to the thread context
1265  * \param[in] com       pointer to the lfsck component
1266  * \param[in] orphan    pointer to the orphan MDT-object
1267  *
1268  *  type "P":           The orphan object to be created was a parent directory
1269  *                      of some DMT-object which linkEA shows that the @orphan
1270  *                      object is missing.
1271  *
1272  * \see lfsck_layout_recreate_parent() for more types.
1273  *
1274  * \retval              positive number for repaired cases
1275  * \retval              0 if needs to repair nothing
1276  * \retval              negative error number on failure
1277  */
1278 static int lfsck_namespace_create_orphan(const struct lu_env *env,
1279                                          struct lfsck_component *com,
1280                                          struct dt_object *orphan)
1281 {
1282         struct lfsck_namespace *ns = com->lc_file_ram;
1283         int                     rc;
1284
1285         if (dt_object_remote(orphan))
1286                 rc = lfsck_namespace_create_orphan_remote(env, com, orphan,
1287                                                           S_IFDIR);
1288         else
1289                 rc = lfsck_namespace_create_orphan_local(env, com, orphan,
1290                                                          S_IFDIR);
1291
1292         if (rc != 0)
1293                 ns->ln_flags |= LF_INCONSISTENT;
1294
1295         return rc;
1296 }
1297
1298 /**
1299  * Remove the specified entry from the linkEA.
1300  *
1301  * Locate the linkEA entry with the given @cname and @pfid, then
1302  * remove this entry or the other entries those are repeated with
1303  * this entry.
1304  *
1305  * \param[in] env       pointer to the thread context
1306  * \param[in] com       pointer to the lfsck component
1307  * \param[in] obj       pointer to the dt_object to be handled
1308  * \param[in,out]ldata  pointer to the buffer that holds the linkEA
1309  * \param[in] cname     the name for the child in the parent directory
1310  * \param[in] pfid      the parent directory's FID for the linkEA
1311  * \param[in] next      if true, then remove the first found linkEA
1312  *                      entry, and move the ldata->ld_lee to next entry
1313  *
1314  * \retval              positive number for repaired cases
1315  * \retval              0 if nothing to be repaired
1316  * \retval              negative error number on failure
1317  */
1318 static int lfsck_namespace_shrink_linkea(const struct lu_env *env,
1319                                          struct lfsck_component *com,
1320                                          struct dt_object *obj,
1321                                          struct linkea_data *ldata,
1322                                          struct lu_name *cname,
1323                                          struct lu_fid *pfid,
1324                                          bool next)
1325 {
1326         struct lfsck_instance           *lfsck     = com->lc_lfsck;
1327         struct dt_device                *dev       = lfsck->li_bottom;
1328         struct lfsck_bookmark           *bk        = &lfsck->li_bookmark_ram;
1329         struct thandle                  *th        = NULL;
1330         struct lustre_handle             lh        = { 0 };
1331         struct linkea_data               ldata_new = { 0 };
1332         struct lu_buf                    linkea_buf;
1333         int                              rc        = 0;
1334         ENTRY;
1335
1336         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
1337                               MDS_INODELOCK_UPDATE |
1338                               MDS_INODELOCK_XATTR, LCK_EX);
1339         if (rc != 0)
1340                 GOTO(log, rc);
1341
1342         if (next)
1343                 linkea_del_buf(ldata, cname);
1344         else
1345                 lfsck_namespace_filter_linkea_entry(ldata, cname, pfid,
1346                                                     true);
1347         lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
1348                        ldata->ld_leh->leh_len);
1349
1350 again:
1351         th = dt_trans_create(env, dev);
1352         if (IS_ERR(th))
1353                 GOTO(unlock1, rc = PTR_ERR(th));
1354
1355         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1356                                   XATTR_NAME_LINK, 0, th);
1357         if (rc != 0)
1358                 GOTO(stop, rc);
1359
1360         rc = dt_trans_start_local(env, dev, th);
1361         if (rc != 0)
1362                 GOTO(stop, rc);
1363
1364         dt_write_lock(env, obj, 0);
1365         if (unlikely(lfsck_is_dead_obj(obj)))
1366                 GOTO(unlock2, rc = -ENOENT);
1367
1368         rc = lfsck_links_read2(env, obj, &ldata_new);
1369         if (rc != 0)
1370                 GOTO(unlock2, rc);
1371
1372         /* The specified linkEA entry has been removed by race. */
1373         rc = linkea_links_find(&ldata_new, cname, pfid);
1374         if (rc != 0)
1375                 GOTO(unlock2, rc = 0);
1376
1377         if (bk->lb_param & LPF_DRYRUN)
1378                 GOTO(unlock2, rc = 1);
1379
1380         if (next)
1381                 linkea_del_buf(&ldata_new, cname);
1382         else
1383                 lfsck_namespace_filter_linkea_entry(&ldata_new, cname, pfid,
1384                                                     true);
1385
1386         if (linkea_buf.lb_len < ldata_new.ld_leh->leh_len) {
1387                 dt_write_unlock(env, obj);
1388                 dt_trans_stop(env, dev, th);
1389                 lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
1390                                ldata_new.ld_leh->leh_len);
1391                 goto again;
1392         }
1393
1394         lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
1395                        ldata_new.ld_leh->leh_len);
1396         rc = dt_xattr_set(env, obj, &linkea_buf,
1397                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1398
1399         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
1400
1401 unlock2:
1402         dt_write_unlock(env, obj);
1403
1404 stop:
1405         dt_trans_stop(env, dev, th);
1406
1407 unlock1:
1408         lfsck_ibits_unlock(&lh, LCK_EX);
1409
1410 log:
1411         CDEBUG(D_LFSCK, "%s: namespace LFSCK remove %s linkEA entry "
1412                "for the object: "DFID", parent "DFID", name %.*s\n",
1413                lfsck_lfsck2name(lfsck), next ? "invalid" : "redundant",
1414                PFID(lfsck_dto2fid(obj)), PFID(pfid), cname->ln_namelen,
1415                cname->ln_name);
1416
1417         if (rc != 0) {
1418                 struct lfsck_namespace *ns = com->lc_file_ram;
1419
1420                 ns->ln_flags |= LF_INCONSISTENT;
1421         }
1422
1423         return rc;
1424 }
1425
1426 /**
1427  * Conditionally remove the specified entry from the linkEA.
1428  *
1429  * Take the parent lock firstly, then check whether the specified
1430  * name entry exists or not: if yes, do nothing; otherwise, call
1431  * lfsck_namespace_shrink_linkea() to remove the linkea entry.
1432  *
1433  * \param[in] env       pointer to the thread context
1434  * \param[in] com       pointer to the lfsck component
1435  * \param[in] parent    pointer to the parent directory
1436  * \param[in] child     pointer to the child object that holds the linkEA
1437  * \param[in,out]ldata  pointer to the buffer that holds the linkEA
1438  * \param[in] cname     the name for the child in the parent directory
1439  * \param[in] pfid      the parent directory's FID for the linkEA
1440  *
1441  * \retval              positive number for repaired cases
1442  * \retval              0 if nothing to be repaired
1443  * \retval              negative error number on failure
1444  */
1445 static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env,
1446                                               struct lfsck_component *com,
1447                                               struct dt_object *parent,
1448                                               struct dt_object *child,
1449                                               struct linkea_data *ldata,
1450                                               struct lu_name *cname,
1451                                               struct lu_fid *pfid)
1452 {
1453         struct lu_fid           *cfid   = &lfsck_env_info(env)->lti_fid3;
1454         struct lustre_handle     lh     = { 0 };
1455         int                      rc;
1456         ENTRY;
1457
1458         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
1459                               MDS_INODELOCK_UPDATE, LCK_EX);
1460         if (rc != 0)
1461                 RETURN(rc);
1462
1463         dt_read_lock(env, parent, 0);
1464         if (unlikely(lfsck_is_dead_obj(parent))) {
1465                 dt_read_unlock(env, parent);
1466                 lfsck_ibits_unlock(&lh, LCK_EX);
1467                 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
1468                                                    cname, pfid, true);
1469
1470                 RETURN(rc);
1471         }
1472
1473         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1474                        (const struct dt_key *)cname->ln_name,
1475                        BYPASS_CAPA);
1476         dt_read_unlock(env, parent);
1477
1478         /* It is safe to release the ldlm lock, because when the logic come
1479          * here, we have got all the needed information above whether the
1480          * linkEA entry is valid or not. It is not important that others
1481          * may add new linkEA entry after the ldlm lock released. If other
1482          * has removed the specified linkEA entry by race, then it is OK,
1483          * because the subsequent lfsck_namespace_shrink_linkea() can handle
1484          * such case. */
1485         lfsck_ibits_unlock(&lh, LCK_EX);
1486         if (rc == -ENOENT) {
1487                 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
1488                                                    cname, pfid, true);
1489
1490                 RETURN(rc);
1491         }
1492
1493         if (rc != 0)
1494                 RETURN(rc);
1495
1496         /* The LFSCK just found some internal status of cross-MDTs
1497          * create operation. That is normal. */
1498         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
1499                 linkea_next_entry(ldata);
1500
1501                 RETURN(0);
1502         }
1503
1504         rc = lfsck_namespace_shrink_linkea(env, com, child, ldata, cname,
1505                                            pfid, true);
1506
1507         RETURN(rc);
1508 }
1509
1510 /**
1511  * Conditionally replace name entry in the parent.
1512  *
1513  * As required, the LFSCK may re-create the lost MDT-object for dangling
1514  * name entry, but such repairing may be wrong because of bad FID in the
1515  * name entry. As the LFSCK processing, the real MDT-object may be found,
1516  * then the LFSCK should check whether the former re-created MDT-object
1517  * has been modified or not, if not, then destroy it and update the name
1518  * entry in the parent to reference the real MDT-object.
1519  *
1520  * \param[in] env       pointer to the thread context
1521  * \param[in] com       pointer to the lfsck component
1522  * \param[in] parent    pointer to the parent directory
1523  * \param[in] child     pointer to the MDT-object that may be the real
1524  *                      MDT-object corresponding to the name entry in parent
1525  * \param[in] cfid      the current FID in the name entry
1526  * \param[in] cname     contains the name of the child in the parent directory
1527  *
1528  * \retval              positive number for repaired cases
1529  * \retval              0 if nothing to be repaired
1530  * \retval              negative error number on failure
1531  */
1532 static int lfsck_namespace_replace_cond(const struct lu_env *env,
1533                                         struct lfsck_component *com,
1534                                         struct dt_object *parent,
1535                                         struct dt_object *child,
1536                                         const struct lu_fid *cfid,
1537                                         const struct lu_name *cname)
1538 {
1539         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1540         struct lu_fid                   *tfid   = &info->lti_fid5;
1541         struct lu_attr                  *la     = &info->lti_la;
1542         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1543         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1544         struct dt_device                *dev    = lfsck->li_next;
1545         const char                      *name   = cname->ln_name;
1546         struct dt_object                *obj    = NULL;
1547         struct lustre_handle             plh    = { 0 };
1548         struct lustre_handle             clh    = { 0 };
1549         struct linkea_data               ldata  = { 0 };
1550         struct thandle                  *th     = NULL;
1551         bool                             exist  = true;
1552         int                              rc     = 0;
1553         ENTRY;
1554
1555         rc = lfsck_ibits_lock(env, lfsck, parent, &plh,
1556                               MDS_INODELOCK_UPDATE, LCK_EX);
1557         if (rc != 0)
1558                 GOTO(log, rc);
1559
1560         if (!fid_is_sane(cfid)) {
1561                 exist = false;
1562                 goto replace;
1563         }
1564
1565         obj = lfsck_object_find(env, lfsck, cfid);
1566         if (IS_ERR(obj)) {
1567                 rc = PTR_ERR(obj);
1568                 if (rc == -ENOENT) {
1569                         exist = false;
1570                         goto replace;
1571                 }
1572
1573                 GOTO(log, rc);
1574         }
1575
1576         if (!dt_object_exists(obj)) {
1577                 exist = false;
1578                 goto replace;
1579         }
1580
1581         rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
1582                        (const struct dt_key *)name, BYPASS_CAPA);
1583         if (rc == -ENOENT) {
1584                 exist = false;
1585                 goto replace;
1586         }
1587
1588         if (rc != 0)
1589                 GOTO(log, rc);
1590
1591         /* Someone changed the name entry, cannot replace it. */
1592         if (!lu_fid_eq(cfid, tfid))
1593                 GOTO(log, rc = 0);
1594
1595         /* lock the object to be destroyed. */
1596         rc = lfsck_ibits_lock(env, lfsck, obj, &clh,
1597                               MDS_INODELOCK_UPDATE |
1598                               MDS_INODELOCK_XATTR, LCK_EX);
1599         if (rc != 0)
1600                 GOTO(log, rc);
1601
1602         if (unlikely(lfsck_is_dead_obj(obj))) {
1603                 exist = false;
1604                 goto replace;
1605         }
1606
1607         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
1608         if (rc != 0)
1609                 GOTO(log, rc);
1610
1611         /* The object has been modified by other(s), or it is not created by
1612          * LFSCK, the two cases are indistinguishable. So cannot replace it. */
1613         if (la->la_ctime != 0)
1614                 GOTO(log, rc);
1615
1616         if (S_ISREG(la->la_mode)) {
1617                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
1618                                   BYPASS_CAPA);
1619                 /* If someone has created related OST-object(s),
1620                  * then keep it. */
1621                 if ((rc > 0) || (rc < 0 && rc != -ENODATA))
1622                         GOTO(log, rc = (rc > 0 ? 0 : rc));
1623         }
1624
1625 replace:
1626         dt_read_lock(env, child, 0);
1627         rc = lfsck_links_read2(env, child, &ldata);
1628         dt_read_unlock(env, child);
1629
1630         /* Someone changed the child, no need to replace. */
1631         if (rc == -ENODATA)
1632                 GOTO(log, rc = 0);
1633
1634         if (rc != 0)
1635                 GOTO(log, rc);
1636
1637         rc = linkea_links_find(&ldata, cname, lfsck_dto2fid(parent));
1638         /* Someone moved the child, no need to replace. */
1639         if (rc != 0)
1640                 GOTO(log, rc = 0);
1641
1642         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1643                 GOTO(log, rc = 1);
1644
1645         th = dt_trans_create(env, dev);
1646         if (IS_ERR(th))
1647                 GOTO(log, rc = PTR_ERR(th));
1648
1649         if (exist) {
1650                 rc = dt_declare_destroy(env, obj, th);
1651                 if (rc != 0)
1652                         GOTO(stop, rc);
1653         }
1654
1655         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
1656         if (rc != 0)
1657                 GOTO(stop, rc);
1658
1659         rec->rec_type = S_IFDIR;
1660         rec->rec_fid = lfsck_dto2fid(child);
1661         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1662                                (const struct dt_key *)name, th);
1663         if (rc != 0)
1664                 GOTO(stop, rc);
1665
1666         rc = dt_trans_start(env, dev, th);
1667         if (rc != 0)
1668                 GOTO(stop, rc);
1669
1670         if (exist) {
1671                 rc = dt_destroy(env, obj, th);
1672                 if (rc != 0)
1673                         GOTO(stop, rc);
1674         }
1675
1676         /* The old name entry maybe not exist. */
1677         dt_delete(env, parent, (const struct dt_key *)name, th,
1678                   BYPASS_CAPA);
1679
1680         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1681                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
1682
1683         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1684
1685 stop:
1686         dt_trans_stop(env, dev, th);
1687
1688 log:
1689         lfsck_ibits_unlock(&clh, LCK_EX);
1690         lfsck_ibits_unlock(&plh, LCK_EX);
1691         if (obj != NULL && !IS_ERR(obj))
1692                 lfsck_object_put(env, obj);
1693
1694         CDEBUG(D_LFSCK, "%s: namespace LFSCK conditionally destroy the "
1695                "object "DFID" because of conflict with the object "DFID
1696                " under the parent "DFID" with name %s: rc = %d\n",
1697                lfsck_lfsck2name(lfsck), PFID(cfid),
1698                PFID(lfsck_dto2fid(child)), PFID(lfsck_dto2fid(parent)),
1699                name, rc);
1700
1701         return rc;
1702 }
1703
1704 /**
1705  * Overwrite the linkEA for the object with the given ldata.
1706  *
1707  * The caller should take the ldlm lock before the calling.
1708  *
1709  * \param[in] env       pointer to the thread context
1710  * \param[in] com       pointer to the lfsck component
1711  * \param[in] obj       pointer to the dt_object to be handled
1712  * \param[in] ldata     pointer to the new linkEA data
1713  *
1714  * \retval              positive number for repaired cases
1715  * \retval              0 if nothing to be repaired
1716  * \retval              negative error number on failure
1717  */
1718 int lfsck_namespace_rebuild_linkea(const struct lu_env *env,
1719                                    struct lfsck_component *com,
1720                                    struct dt_object *obj,
1721                                    struct linkea_data *ldata)
1722 {
1723         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1724         struct dt_device                *dev    = lfsck->li_bottom;
1725         struct thandle                  *th     = NULL;
1726         struct lu_buf                    linkea_buf;
1727         int                              rc     = 0;
1728         ENTRY;
1729
1730         LASSERT(!dt_object_remote(obj));
1731
1732         th = dt_trans_create(env, dev);
1733         if (IS_ERR(th))
1734                 GOTO(log, rc = PTR_ERR(th));
1735
1736         lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
1737                        ldata->ld_leh->leh_len);
1738         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1739                                   XATTR_NAME_LINK, 0, th);
1740         if (rc != 0)
1741                 GOTO(stop, rc);
1742
1743         rc = dt_trans_start_local(env, dev, th);
1744         if (rc != 0)
1745                 GOTO(stop, rc);
1746
1747         dt_write_lock(env, obj, 0);
1748         if (unlikely(lfsck_is_dead_obj(obj)))
1749                 GOTO(unlock, rc = 0);
1750
1751         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1752                 GOTO(unlock, rc = 1);
1753
1754         rc = dt_xattr_set(env, obj, &linkea_buf,
1755                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1756
1757         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
1758
1759 unlock:
1760         dt_write_unlock(env, obj);
1761
1762 stop:
1763         dt_trans_stop(env, dev, th);
1764
1765 log:
1766         CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild linkEA for the "
1767                "object "DFID": rc = %d\n",
1768                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
1769
1770         if (rc != 0) {
1771                 struct lfsck_namespace *ns = com->lc_file_ram;
1772
1773                 ns->ln_flags |= LF_INCONSISTENT;
1774         }
1775
1776         return rc;
1777 }
1778
1779 /**
1780  * Repair invalid name entry.
1781  *
1782  * If the name entry contains invalid information, such as bad file type
1783  * or (and) corrupted object FID, then either remove the name entry or
1784  * udpate the name entry with the given (right) information.
1785  *
1786  * \param[in] env       pointer to the thread context
1787  * \param[in] com       pointer to the lfsck component
1788  * \param[in] parent    pointer to the parent directory
1789  * \param[in] child     pointer to the object referenced by the name entry
1790  * \param[in] name      the old name of the child under the parent directory
1791  * \param[in] name2     the new name of the child under the parent directory
1792  * \param[in] type      the type claimed by the name entry
1793  * \param[in] update    update the name entry if true; otherwise, remove it
1794  * \param[in] dec       decrease the parent nlink count if true
1795  *
1796  * \retval              positive number for repaired successfully
1797  * \retval              0 if nothing to be repaired
1798  * \retval              negative error number on failure
1799  */
1800 int lfsck_namespace_repair_dirent(const struct lu_env *env,
1801                                   struct lfsck_component *com,
1802                                   struct dt_object *parent,
1803                                   struct dt_object *child,
1804                                   const char *name, const char *name2,
1805                                   __u16 type, bool update, bool dec)
1806 {
1807         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1808         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1809         const struct lu_fid             *cfid   = lfsck_dto2fid(child);
1810         struct lu_fid                   *tfid   = &info->lti_fid5;
1811         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1812         struct dt_device                *dev    = lfsck->li_next;
1813         struct thandle                  *th     = NULL;
1814         struct lustre_handle             lh     = { 0 };
1815         int                              rc     = 0;
1816         ENTRY;
1817
1818         if (unlikely(!dt_try_as_dir(env, parent)))
1819                 GOTO(log, rc = -ENOTDIR);
1820
1821         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
1822                               MDS_INODELOCK_UPDATE, LCK_EX);
1823         if (rc != 0)
1824                 GOTO(log, rc);
1825
1826         th = dt_trans_create(env, dev);
1827         if (IS_ERR(th))
1828                 GOTO(unlock1, rc = PTR_ERR(th));
1829
1830         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
1831         if (rc != 0)
1832                 GOTO(stop, rc);
1833
1834         if (update) {
1835                 rec->rec_type = lfsck_object_type(child) & S_IFMT;
1836                 rec->rec_fid = cfid;
1837                 rc = dt_declare_insert(env, parent,
1838                                        (const struct dt_rec *)rec,
1839                                        (const struct dt_key *)name2, th);
1840                 if (rc != 0)
1841                         GOTO(stop, rc);
1842         }
1843
1844         if (dec) {
1845                 rc = dt_declare_ref_del(env, parent, th);
1846                 if (rc != 0)
1847                         GOTO(stop, rc);
1848         }
1849
1850         rc = dt_trans_start(env, dev, th);
1851         if (rc != 0)
1852                 GOTO(stop, rc);
1853
1854         dt_write_lock(env, parent, 0);
1855         rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
1856                        (const struct dt_key *)name, BYPASS_CAPA);
1857         /* Someone has removed the bad name entry by race. */
1858         if (rc == -ENOENT)
1859                 GOTO(unlock2, rc = 0);
1860
1861         if (rc != 0)
1862                 GOTO(unlock2, rc);
1863
1864         /* Someone has removed the bad name entry and reused it for other
1865          * object by race. */
1866         if (!lu_fid_eq(tfid, cfid))
1867                 GOTO(unlock2, rc = 0);
1868
1869         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1870                 GOTO(unlock2, rc = 1);
1871
1872         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
1873                        BYPASS_CAPA);
1874         if (rc != 0)
1875                 GOTO(unlock2, rc);
1876
1877         if (update) {
1878                 rc = dt_insert(env, parent,
1879                                (const struct dt_rec *)rec,
1880                                (const struct dt_key *)name2, th,
1881                                BYPASS_CAPA, 1);
1882                 if (rc != 0)
1883                         GOTO(unlock2, rc);
1884         }
1885
1886         if (dec) {
1887                 rc = dt_ref_del(env, parent, th);
1888                 if (rc != 0)
1889                         GOTO(unlock2, rc);
1890         }
1891
1892         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
1893
1894 unlock2:
1895         dt_write_unlock(env, parent);
1896
1897 stop:
1898         dt_trans_stop(env, dev, th);
1899
1900         /* We are not sure whether the child will become orphan or not.
1901          * Record it in the LFSCK tracing file for further checking in
1902          * the second-stage scanning. */
1903         if (!update && !dec && rc == 0)
1904                 lfsck_namespace_trace_update(env, com, cfid,
1905                                              LNTF_CHECK_LINKEA, true);
1906
1907 unlock1:
1908         lfsck_ibits_unlock(&lh, LCK_EX);
1909
1910 log:
1911         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found bad name "
1912                "entry for: parent "DFID", child "DFID", name %s, type "
1913                "in name entry %o, type claimed by child %o. repair it "
1914                "by %s with new name2 %s: rc = %d\n", lfsck_lfsck2name(lfsck),
1915                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
1916                name, type, update ? lfsck_object_type(child) : 0,
1917                update ? "updating" : "removing", name2, rc);
1918
1919         if (rc != 0) {
1920                 struct lfsck_namespace *ns = com->lc_file_ram;
1921
1922                 ns->ln_flags |= LF_INCONSISTENT;
1923         }
1924
1925         return rc;
1926 }
1927
1928 /**
1929  * Update the ".." name entry for the given object.
1930  *
1931  * The object's ".." is corrupted, this function will update the ".." name
1932  * entry with the given pfid, and the linkEA with the given ldata.
1933  *
1934  * The caller should take the ldlm lock before the calling.
1935  *
1936  * \param[in] env       pointer to the thread context
1937  * \param[in] com       pointer to the lfsck component
1938  * \param[in] obj       pointer to the dt_object to be handled
1939  * \param[in] pfid      the new fid for the object's ".." name entry
1940  * \param[in] cname     the name for the @obj in the parent directory
1941  *
1942  * \retval              positive number for repaired cases
1943  * \retval              0 if nothing to be repaired
1944  * \retval              negative error number on failure
1945  */
1946 static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env,
1947                                                   struct lfsck_component *com,
1948                                                   struct dt_object *obj,
1949                                                   const struct lu_fid *pfid,
1950                                                   struct lu_name *cname)
1951 {
1952         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1953         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1954         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1955         struct dt_device                *dev    = lfsck->li_bottom;
1956         struct thandle                  *th     = NULL;
1957         struct linkea_data               ldata  = { 0 };
1958         struct lu_buf                    linkea_buf;
1959         int                              rc     = 0;
1960         ENTRY;
1961
1962         LASSERT(!dt_object_remote(obj));
1963         LASSERT(S_ISDIR(lfsck_object_type(obj)));
1964
1965         rc = linkea_data_new(&ldata, &info->lti_big_buf);
1966         if (rc != 0)
1967                 GOTO(log, rc);
1968
1969         rc = linkea_add_buf(&ldata, cname, pfid);
1970         if (rc != 0)
1971                 GOTO(log, rc);
1972
1973         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
1974                        ldata.ld_leh->leh_len);
1975
1976         th = dt_trans_create(env, dev);
1977         if (IS_ERR(th))
1978                 GOTO(log, rc = PTR_ERR(th));
1979
1980         rc = dt_declare_delete(env, obj, (const struct dt_key *)dotdot, th);
1981         if (rc != 0)
1982                 GOTO(stop, rc);
1983
1984         rec->rec_type = S_IFDIR;
1985         rec->rec_fid = pfid;
1986         rc = dt_declare_insert(env, obj, (const struct dt_rec *)rec,
1987                                (const struct dt_key *)dotdot, th);
1988         if (rc != 0)
1989                 GOTO(stop, rc);
1990
1991         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1992                                   XATTR_NAME_LINK, 0, th);
1993         if (rc != 0)
1994                 GOTO(stop, rc);
1995
1996         rc = dt_trans_start_local(env, dev, th);
1997         if (rc != 0)
1998                 GOTO(stop, rc);
1999
2000         dt_write_lock(env, obj, 0);
2001         if (unlikely(lfsck_is_dead_obj(obj)))
2002                 GOTO(unlock, rc = 0);
2003
2004         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
2005                 GOTO(unlock, rc = 1);
2006
2007         /* The old ".." name entry maybe not exist. */
2008         dt_delete(env, obj, (const struct dt_key *)dotdot, th,
2009                   BYPASS_CAPA);
2010
2011         rc = dt_insert(env, obj, (const struct dt_rec *)rec,
2012                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
2013         if (rc != 0)
2014                 GOTO(unlock, rc);
2015
2016         rc = dt_xattr_set(env, obj, &linkea_buf,
2017                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
2018
2019         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
2020
2021 unlock:
2022         dt_write_unlock(env, obj);
2023
2024 stop:
2025         dt_trans_stop(env, dev, th);
2026
2027 log:
2028         CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild dotdot name entry for "
2029                "the object "DFID", new parent "DFID": rc = %d\n",
2030                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)),
2031                PFID(pfid), rc);
2032
2033         if (rc != 0) {
2034                 struct lfsck_namespace *ns = com->lc_file_ram;
2035
2036                 ns->ln_flags |= LF_INCONSISTENT;
2037         }
2038
2039         return rc;
2040 }
2041
2042 /**
2043  * Handle orphan @obj during Double Scan Directory.
2044  *
2045  * Remove the @obj's current (invalid) linkEA entries, and insert
2046  * it in the directory .lustre/lost+found/MDTxxxx/ with the name:
2047  * ${FID}-${PFID}-D-${conflict_version}
2048  *
2049  * The caller should take the ldlm lock before the calling.
2050  *
2051  * \param[in] env       pointer to the thread context
2052  * \param[in] com       pointer to the lfsck component
2053  * \param[in] obj       pointer to the orphan object to be handled
2054  * \param[in] pfid      the new fid for the object's ".." name entry
2055  * \param[in,out] lh    ldlm lock handler for the given @obj
2056  * \param[out] type     to tell the caller what the inconsistency is
2057  *
2058  * \retval              positive number for repaired cases
2059  * \retval              0 if nothing to be repaired
2060  * \retval              negative error number on failure
2061  */
2062 static int
2063 lfsck_namespace_dsd_orphan(const struct lu_env *env,
2064                            struct lfsck_component *com,
2065                            struct dt_object *obj,
2066                            const struct lu_fid *pfid,
2067                            struct lustre_handle *lh,
2068                            enum lfsck_namespace_inconsistency_type *type)
2069 {
2070         struct lfsck_thread_info *info = lfsck_env_info(env);
2071         int                       rc;
2072         ENTRY;
2073
2074         /* Remove the unrecognized linkEA. */
2075         rc = lfsck_namespace_links_remove(env, com, obj);
2076         lfsck_ibits_unlock(lh, LCK_EX);
2077         if (rc < 0 && rc != -ENODATA)
2078                 RETURN(rc);
2079
2080         *type = LNIT_MUL_REF;
2081         /* The unique linkEA is invalid, even if the ".." name entry may be
2082          * valid, we still cannot know via which name entry this directory
2083          * will be referenced. Then handle it as pure orphan. */
2084         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
2085                  "-"DFID, PFID(pfid));
2086         rc = lfsck_namespace_insert_orphan(env, com, obj,
2087                                            info->lti_tmpbuf, "D", NULL);
2088
2089         RETURN(rc);
2090 }
2091
2092 /**
2093  * Double Scan Directory object for single linkEA entry case.
2094  *
2095  * The given @child has unique linkEA entry. If the linkEA entry is valid,
2096  * then check whether the name is in the namespace or not, if not, add the
2097  * missing name entry back to namespace. If the linkEA entry is invalid,
2098  * then remove it and insert the @child in the .lustre/lost+found/MDTxxxx/
2099  * as an orphan.
2100  *
2101  * \param[in] env       pointer to the thread context
2102  * \param[in] com       pointer to the lfsck component
2103  * \param[in] child     pointer to the directory to be double scanned
2104  * \param[in] pfid      the FID corresponding to the ".." entry
2105  * \param[in] ldata     pointer to the linkEA data for the given @child
2106  * \param[in,out] lh    ldlm lock handler for the given @child
2107  * \param[out] type     to tell the caller what the inconsistency is
2108  * \param[in] retry     if found inconsistency, but the caller does not hold
2109  *                      ldlm lock on the @child, then set @retry as true
2110  *
2111  * \retval              positive number for repaired cases
2112  * \retval              0 if nothing to be repaired
2113  * \retval              negative error number on failure
2114  */
2115 static int
2116 lfsck_namespace_dsd_single(const struct lu_env *env,
2117                            struct lfsck_component *com,
2118                            struct dt_object *child,
2119                            const struct lu_fid *pfid,
2120                            struct linkea_data *ldata,
2121                            struct lustre_handle *lh,
2122                            enum lfsck_namespace_inconsistency_type *type,
2123                            bool *retry)
2124 {
2125         struct lfsck_thread_info *info          = lfsck_env_info(env);
2126         struct lu_name           *cname         = &info->lti_name;
2127         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
2128         struct lu_fid            *tfid          = &info->lti_fid3;
2129         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2130         struct dt_object         *parent        = NULL;
2131         int                       rc            = 0;
2132         ENTRY;
2133
2134         lfsck_namespace_unpack_linkea_entry(ldata, cname, tfid, info->lti_key);
2135         /* The unique linkEA entry with bad parent will be handled as orphan. */
2136         if (!fid_is_sane(tfid)) {
2137                 if (!lustre_handle_is_used(lh) && retry != NULL)
2138                         *retry = true;
2139                 else
2140                         rc = lfsck_namespace_dsd_orphan(env, com, child,
2141                                                         pfid, lh, type);
2142
2143                 GOTO(out, rc);
2144         }
2145
2146         parent = lfsck_object_find_bottom(env, lfsck, tfid);
2147         if (IS_ERR(parent))
2148                 GOTO(out, rc = PTR_ERR(parent));
2149
2150         /* We trust the unique linkEA entry in spite of whether it matches the
2151          * ".." name entry or not. Because even if the linkEA entry is wrong
2152          * and the ".." name entry is right, we still cannot know via which
2153          * name entry the child will be referenced, since all known entries
2154          * have been verified during the first-stage scanning. */
2155         if (!dt_object_exists(parent)) {
2156                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2157                         *retry = true;
2158
2159                         GOTO(out, rc = 0);
2160                 }
2161
2162                 lfsck_ibits_unlock(lh, LCK_EX);
2163
2164 lost_parent:
2165                 /* Create the lost parent as an orphan. */
2166                 rc = lfsck_namespace_create_orphan(env, com, parent);
2167                 if (rc >= 0) {
2168                         /* Add the missing name entry to the parent. */
2169                         rc = lfsck_namespace_insert_normal(env, com, parent,
2170                                                         child, cname->ln_name);
2171                         if (unlikely(rc == -EEXIST)) {
2172                                 /* Unfortunately, someone reused the name
2173                                  * under the parent by race. So we have
2174                                  * to remove the linkEA entry from
2175                                  * current child object. It means that the
2176                                  * LFSCK cannot recover the system
2177                                  * totally back to its original status,
2178                                  * but it is necessary to make the
2179                                  * current system to be consistent. */
2180                                 rc = lfsck_namespace_shrink_linkea(env,
2181                                                 com, child, ldata,
2182                                                 cname, tfid, true);
2183                                 if (rc >= 0) {
2184                                         snprintf(info->lti_tmpbuf,
2185                                                  sizeof(info->lti_tmpbuf),
2186                                                  "-"DFID, PFID(pfid));
2187                                         rc = lfsck_namespace_insert_orphan(env,
2188                                                 com, child, info->lti_tmpbuf,
2189                                                 "D", NULL);
2190                                 }
2191                         }
2192                 }
2193
2194                 GOTO(out, rc);
2195         }
2196
2197         /* The unique linkEA entry with bad parent will be handled as orphan. */
2198         if (unlikely(!dt_try_as_dir(env, parent))) {
2199                 if (!lustre_handle_is_used(lh) && retry != NULL)
2200                         *retry = true;
2201                 else
2202                         rc = lfsck_namespace_dsd_orphan(env, com, child,
2203                                                         pfid, lh, type);
2204
2205                 GOTO(out, rc);
2206         }
2207
2208         rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
2209                        (const struct dt_key *)cname->ln_name, BYPASS_CAPA);
2210         if (rc == -ENOENT) {
2211                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2212                         *retry = true;
2213
2214                         GOTO(out, rc = 0);
2215                 }
2216
2217                 lfsck_ibits_unlock(lh, LCK_EX);
2218                 /* Add the missing name entry back to the namespace. */
2219                 rc = lfsck_namespace_insert_normal(env, com, parent, child,
2220                                                    cname->ln_name);
2221                 if (unlikely(rc == -ESTALE))
2222                         /* It may happen when the remote object has been
2223                          * removed, but the local MDT is not aware of that. */
2224                         goto lost_parent;
2225
2226                 if (unlikely(rc == -EEXIST)) {
2227                         /* Unfortunately, someone reused the name under the
2228                          * parent by race. So we have to remove the linkEA
2229                          * entry from current child object. It means that the
2230                          * LFSCK cannot recover the system totally back to
2231                          * its original status, but it is necessary to make
2232                          * the current system to be consistent.
2233                          *
2234                          * It also may be because of the LFSCK found some
2235                          * internal status of create operation. Under such
2236                          * case, nothing to be done. */
2237                         rc = lfsck_namespace_shrink_linkea_cond(env, com,
2238                                         parent, child, ldata, cname, tfid);
2239                         if (rc >= 0) {
2240                                 snprintf(info->lti_tmpbuf,
2241                                          sizeof(info->lti_tmpbuf),
2242                                          "-"DFID, PFID(pfid));
2243                                 rc = lfsck_namespace_insert_orphan(env, com,
2244                                         child, info->lti_tmpbuf, "D", NULL);
2245                         }
2246                 }
2247
2248                 GOTO(out, rc);
2249         }
2250
2251         if (rc != 0)
2252                 GOTO(out, rc);
2253
2254         if (!lu_fid_eq(tfid, cfid)) {
2255                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2256                         *retry = true;
2257
2258                         GOTO(out, rc = 0);
2259                 }
2260
2261                 lfsck_ibits_unlock(lh, LCK_EX);
2262                 /* The name entry references another MDT-object that
2263                  * may be created by the LFSCK for repairing dangling
2264                  * name entry. Try to replace it. */
2265                 rc = lfsck_namespace_replace_cond(env, com, parent, child,
2266                                                   tfid, cname);
2267                 if (rc == 0)
2268                         rc = lfsck_namespace_dsd_orphan(env, com, child,
2269                                                         pfid, lh, type);
2270
2271                 GOTO(out, rc);
2272         }
2273
2274         /* The ".." name entry is wrong, update it. */
2275         if (!lu_fid_eq(pfid, lfsck_dto2fid(parent))) {
2276                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2277                         *retry = true;
2278
2279                         GOTO(out, rc = 0);
2280                 }
2281
2282                 *type = LNIT_UNMATCHED_PAIRS;
2283                 rc = lfsck_namespace_repair_unmatched_pairs(env, com, child,
2284                                                 lfsck_dto2fid(parent), cname);
2285         }
2286
2287         GOTO(out, rc);
2288
2289 out:
2290         if (parent != NULL && !IS_ERR(parent))
2291                 lfsck_object_put(env, parent);
2292
2293         return rc;
2294 }
2295
2296 /**
2297  * Double Scan Directory object for multiple linkEA entries case.
2298  *
2299  * The given @child has multiple linkEA entries. There is at most one linkEA
2300  * entry will be valid, all the others will be removed. Firstly, the function
2301  * will try to find out the linkEA entry for which the name entry exists under
2302  * the given parent (@pfid). If there is no linkEA entry that matches the given
2303  * ".." name entry, then tries to find out the first linkEA entry that both the
2304  * parent and the name entry exist to rebuild a new ".." name entry.
2305  *
2306  * \param[in] env       pointer to the thread context
2307  * \param[in] com       pointer to the lfsck component
2308  * \param[in] child     pointer to the directory to be double scanned
2309  * \param[in] pfid      the FID corresponding to the ".." entry
2310  * \param[in] ldata     pointer to the linkEA data for the given @child
2311  * \param[in,out] lh    ldlm lock handler for the given @child
2312  * \param[out] type     to tell the caller what the inconsistency is
2313  * \param[in] lpf       true if the ".." entry is under lost+found/MDTxxxx/
2314  *
2315  * \retval              positive number for repaired cases
2316  * \retval              0 if nothing to be repaired
2317  * \retval              negative error number on failure
2318  */
2319 static int
2320 lfsck_namespace_dsd_multiple(const struct lu_env *env,
2321                              struct lfsck_component *com,
2322                              struct dt_object *child,
2323                              const struct lu_fid *pfid,
2324                              struct linkea_data *ldata,
2325                              struct lustre_handle *lh,
2326                              enum lfsck_namespace_inconsistency_type *type,
2327                              bool lpf)
2328 {
2329         struct lfsck_thread_info *info          = lfsck_env_info(env);
2330         struct lu_name           *cname         = &info->lti_name;
2331         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
2332         struct lu_fid            *tfid          = &info->lti_fid3;
2333         struct lu_fid            *pfid2         = &info->lti_fid4;
2334         struct lfsck_namespace   *ns            = com->lc_file_ram;
2335         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2336         struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
2337         struct dt_object         *parent        = NULL;
2338         struct linkea_data        ldata_new     = { 0 };
2339         int                       count         = 0;
2340         int                       rc            = 0;
2341         bool                      once          = true;
2342         ENTRY;
2343
2344 again:
2345         while (ldata->ld_lee != NULL) {
2346                 lfsck_namespace_unpack_linkea_entry(ldata, cname, tfid,
2347                                                     info->lti_key);
2348                 /* Drop repeated linkEA entries. */
2349                 lfsck_namespace_filter_linkea_entry(ldata, cname, tfid, true);
2350                 /* Drop invalid linkEA entry. */
2351                 if (!fid_is_sane(tfid)) {
2352                         linkea_del_buf(ldata, cname);
2353                         continue;
2354                 }
2355
2356                 /* If current dotdot is the .lustre/lost+found/MDTxxxx/,
2357                  * then it is possible that: the directry object has ever
2358                  * been lost, but its name entry was there. In the former
2359                  * LFSCK run, during the first-stage scanning, the LFSCK
2360                  * found the dangling name entry, but it did not recreate
2361                  * the lost object, and when moved to the second-stage
2362                  * scanning, some children objects of the lost directory
2363                  * object were found, then the LFSCK recreated such lost
2364                  * directory object as an orphan.
2365                  *
2366                  * When the LFSCK runs again, if the dangling name is still
2367                  * there, the LFSCK should move the orphan directory object
2368                  * back to the normal namespace. */
2369                 if (!lpf && !lu_fid_eq(pfid, tfid) && once) {
2370                         linkea_next_entry(ldata);
2371                         continue;
2372                 }
2373
2374                 parent = lfsck_object_find_bottom(env, lfsck, tfid);
2375                 if (IS_ERR(parent))
2376                         RETURN(PTR_ERR(parent));
2377
2378                 if (!dt_object_exists(parent)) {
2379                         lfsck_object_put(env, parent);
2380                         if (ldata->ld_leh->leh_reccount > 1) {
2381                                 /* If it is NOT the last linkEA entry, then
2382                                  * there is still other chance to make the
2383                                  * child to be visible via other parent, then
2384                                  * remove this linkEA entry. */
2385                                 linkea_del_buf(ldata, cname);
2386                                 continue;
2387                         }
2388
2389                         break;
2390                 }
2391
2392                 /* The linkEA entry with bad parent will be removed. */
2393                 if (unlikely(!dt_try_as_dir(env, parent))) {
2394                         lfsck_object_put(env, parent);
2395                         linkea_del_buf(ldata, cname);
2396                         continue;
2397                 }
2398
2399                 rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
2400                                (const struct dt_key *)cname->ln_name,
2401                                BYPASS_CAPA);
2402                 *pfid2 = *lfsck_dto2fid(parent);
2403                 if (rc == -ENOENT) {
2404                         lfsck_object_put(env, parent);
2405                         linkea_next_entry(ldata);
2406                         continue;
2407                 }
2408
2409                 if (rc != 0) {
2410                         lfsck_object_put(env, parent);
2411
2412                         RETURN(rc);
2413                 }
2414
2415                 if (lu_fid_eq(tfid, cfid)) {
2416                         lfsck_object_put(env, parent);
2417                         if (!lu_fid_eq(pfid, pfid2)) {
2418                                 *type = LNIT_UNMATCHED_PAIRS;
2419                                 rc = lfsck_namespace_repair_unmatched_pairs(env,
2420                                                 com, child, pfid2, cname);
2421
2422                                 RETURN(rc);
2423                         }
2424
2425 rebuild:
2426                         /* It is the most common case that we find the
2427                          * name entry corresponding to the linkEA entry
2428                          * that matches the ".." name entry. */
2429                         rc = linkea_data_new(&ldata_new, &info->lti_big_buf);
2430                         if (rc != 0)
2431                                 RETURN(rc);
2432
2433                         rc = linkea_add_buf(&ldata_new, cname, pfid2);
2434                         if (rc != 0)
2435                                 RETURN(rc);
2436
2437                         rc = lfsck_namespace_rebuild_linkea(env, com, child,
2438                                                             &ldata_new);
2439                         if (rc < 0)
2440                                 RETURN(rc);
2441
2442                         linkea_del_buf(ldata, cname);
2443                         linkea_first_entry(ldata);
2444                         /* There may be some invalid dangling name entries under
2445                          * other parent directories, remove all of them. */
2446                         while (ldata->ld_lee != NULL) {
2447                                 lfsck_namespace_unpack_linkea_entry(ldata,
2448                                                 cname, tfid, info->lti_key);
2449                                 if (!fid_is_sane(tfid))
2450                                         goto next;
2451
2452                                 parent = lfsck_object_find_bottom(env, lfsck,
2453                                                                   tfid);
2454                                 if (IS_ERR(parent)) {
2455                                         rc = PTR_ERR(parent);
2456                                         if (rc != -ENOENT &&
2457                                             bk->lb_param & LPF_FAILOUT)
2458                                                 RETURN(rc);
2459
2460                                         goto next;
2461                                 }
2462
2463                                 if (!dt_object_exists(parent)) {
2464                                         lfsck_object_put(env, parent);
2465                                         goto next;
2466                                 }
2467
2468                                 rc = lfsck_namespace_repair_dirent(env, com,
2469                                         parent, child, cname->ln_name,
2470                                         cname->ln_name, S_IFDIR, false, true);
2471                                 lfsck_object_put(env, parent);
2472                                 if (rc < 0) {
2473                                         if (bk->lb_param & LPF_FAILOUT)
2474                                                 RETURN(rc);
2475
2476                                         goto next;
2477                                 }
2478
2479                                 count += rc;
2480
2481 next:
2482                                 linkea_del_buf(ldata, cname);
2483                         }
2484
2485                         ns->ln_dirent_repaired += count;
2486
2487                         RETURN(rc);
2488                 }
2489
2490                 lfsck_ibits_unlock(lh, LCK_EX);
2491                 /* The name entry references another MDT-object that may be
2492                  * created by the LFSCK for repairing dangling name entry.
2493                  * Try to replace it. */
2494                 rc = lfsck_namespace_replace_cond(env, com, parent, child,
2495                                                   tfid, cname);
2496                 lfsck_object_put(env, parent);
2497                 if (rc < 0)
2498                         RETURN(rc);
2499
2500                 if (rc > 0)
2501                         goto rebuild;
2502
2503                 linkea_del_buf(ldata, cname);
2504         }
2505
2506         if (ldata->ld_leh->leh_reccount == 1) {
2507                 rc = lfsck_namespace_dsd_single(env, com, child, pfid, ldata,
2508                                                 lh, type, NULL);
2509
2510                 RETURN(rc);
2511         }
2512
2513         /* All linkEA entries are invalid and removed, then handle the @child
2514          * as an orphan.*/
2515         if (ldata->ld_leh->leh_reccount == 0) {
2516                 rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, lh,
2517                                                 type);
2518
2519                 RETURN(rc);
2520         }
2521
2522         linkea_first_entry(ldata);
2523         /* If the dangling name entry for the orphan directory object has
2524          * been remvoed, then just check whether the directory object is
2525          * still under the .lustre/lost+found/MDTxxxx/ or not. */
2526         if (lpf) {
2527                 lpf = false;
2528                 goto again;
2529         }
2530
2531         /* There is no linkEA entry that matches the ".." name entry. Find
2532          * the first linkEA entry that both parent and name entry exist to
2533          * rebuild a new ".." name entry. */
2534         if (once) {
2535                 once = false;
2536                 goto again;
2537         }
2538
2539         RETURN(rc);
2540 }
2541
2542 /**
2543  * Double scan the directory object for namespace LFSCK.
2544  *
2545  * This function will verify the <parent, child> pairs in the namespace tree:
2546  * the parent references the child via some name entry that should be in the
2547  * child's linkEA entry, the child should back references the parent via its
2548  * ".." name entry.
2549  *
2550  * The LFSCK will scan every linkEA entry in turn until find out the first
2551  * matched pairs. If found, then all other linkEA entries will be dropped.
2552  * If all the linkEA entries cannot match the ".." name entry, then there
2553  * are serveral possible cases:
2554  *
2555  * 1) If there is only one linkEA entry, then trust it as long as the PFID
2556  *    in the linkEA entry is valid.
2557  *
2558  * 2) If there are multiple linkEA entries, then try to find the linkEA
2559  *    that matches the ".." name entry. If found, then all other entries
2560  *    are invalid; otherwise, it is quite possible that the ".." name entry
2561  *    is corrupted. Under such case, the LFSCK will rebuild the ".." name
2562  *    entry according to the first valid linkEA entry (both the parent and
2563  *    the name entry should exist).
2564  *
2565  * 3) If the directory object has no (valid) linkEA entry, then the
2566  *    directory object will be handled as pure orphan and inserted
2567  *    in the .lustre/lost+found/MDTxxxx/ with the name:
2568  *    ${self_FID}-${PFID}-D-${conflict_version}
2569  *
2570  * \param[in] env       pointer to the thread context
2571  * \param[in] com       pointer to the lfsck component
2572  * \param[in] child     pointer to the directory object to be handled
2573  * \param[in] flags     to indicate the specical checking on the @child
2574  *
2575  * \retval              positive number for repaired cases
2576  * \retval              0 if nothing to be repaired
2577  * \retval              negative error number on failure
2578  */
2579 static int lfsck_namespace_double_scan_dir(const struct lu_env *env,
2580                                            struct lfsck_component *com,
2581                                            struct dt_object *child, __u8 flags)
2582 {
2583         struct lfsck_thread_info *info          = lfsck_env_info(env);
2584         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
2585         struct lu_fid            *pfid          = &info->lti_fid2;
2586         struct lfsck_namespace   *ns            = com->lc_file_ram;
2587         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2588         struct lustre_handle      lh            = { 0 };
2589         struct linkea_data        ldata         = { 0 };
2590         bool                      unknown       = false;
2591         bool                      lpf           = false;
2592         bool                      retry         = false;
2593         enum lfsck_namespace_inconsistency_type type = LNIT_BAD_LINKEA;
2594         int                       rc            = 0;
2595         ENTRY;
2596
2597         LASSERT(!dt_object_remote(child));
2598
2599         if (!(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) {
2600                 CDEBUG(D_LFSCK, "%s: some MDT(s) maybe NOT take part in the"
2601                        "the namespace LFSCK, then the LFSCK cannot guarantee"
2602                        "all the name entries have been verified in first-stage"
2603                        "scanning. So have to skip orphan related handling for"
2604                        "the directory object "DFID" with remote name entry\n",
2605                        lfsck_lfsck2name(lfsck), PFID(cfid));
2606
2607                 RETURN(0);
2608         }
2609
2610         if (unlikely(!dt_try_as_dir(env, child)))
2611                 GOTO(out, rc = -ENOTDIR);
2612
2613         /* We only take ldlm lock on the @child when required. When the
2614          * logic comes here for the first time, it is always false. */
2615         if (0) {
2616
2617 lock:
2618                 rc = lfsck_ibits_lock(env, lfsck, child, &lh,
2619                                       MDS_INODELOCK_UPDATE |
2620                                       MDS_INODELOCK_XATTR, LCK_EX);
2621                 if (rc != 0)
2622                         GOTO(out, rc);
2623         }
2624
2625         dt_read_lock(env, child, 0);
2626         if (unlikely(lfsck_is_dead_obj(child))) {
2627                 dt_read_unlock(env, child);
2628
2629                 GOTO(out, rc = 0);
2630         }
2631
2632         rc = dt_lookup(env, child, (struct dt_rec *)pfid,
2633                        (const struct dt_key *)dotdot, BYPASS_CAPA);
2634         if (rc != 0) {
2635                 if (rc != -ENOENT && rc != -ENODATA && rc != -EINVAL) {
2636                         dt_read_unlock(env, child);
2637
2638                         GOTO(out, rc);
2639                 }
2640
2641                 if (!lustre_handle_is_used(&lh)) {
2642                         dt_read_unlock(env, child);
2643                         goto lock;
2644                 }
2645
2646                 fid_zero(pfid);
2647         } else if (lfsck->li_lpf_obj != NULL &&
2648                    lu_fid_eq(pfid, lfsck_dto2fid(lfsck->li_lpf_obj))) {
2649                 lpf = true;
2650         }
2651
2652         rc = lfsck_links_read(env, child, &ldata);
2653         dt_read_unlock(env, child);
2654         if (rc != 0) {
2655                 if (rc != -ENODATA && rc != -EINVAL)
2656                         GOTO(out, rc);
2657
2658                 if (!lustre_handle_is_used(&lh))
2659                         goto lock;
2660
2661                 if (rc == -EINVAL && !fid_is_zero(pfid)) {
2662                         /* Remove the corrupted linkEA. */
2663                         rc = lfsck_namespace_links_remove(env, com, child);
2664                         if (rc == 0)
2665                                 /* Here, because of the crashed linkEA, we
2666                                  * cannot know whether there is some parent
2667                                  * that references the child directory via
2668                                  * some name entry or not. So keep it there,
2669                                  * when the LFSCK run next time, if there is
2670                                  * some parent that references this object,
2671                                  * then the LFSCK can rebuild the linkEA;
2672                                  * otherwise, this object will be handled
2673                                  * as orphan as above. */
2674                                 unknown = true;
2675                 } else {
2676                         /* 1. If we have neither ".." nor linkEA,
2677                          *    then it is an orphan.
2678                          *
2679                          * 2. If we only have the ".." name entry,
2680                          *    but no parent references this child
2681                          *    directory, then handle it as orphan. */
2682                         lfsck_ibits_unlock(&lh, LCK_EX);
2683                         type = LNIT_MUL_REF;
2684                         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
2685                                  "-"DFID, PFID(pfid));
2686                         rc = lfsck_namespace_insert_orphan(env, com, child,
2687                                                 info->lti_tmpbuf, "D", NULL);
2688                 }
2689
2690                 GOTO(out, rc);
2691         }
2692
2693         linkea_first_entry(&ldata);
2694         /* This is the most common case: the object has unique linkEA entry. */
2695         if (ldata.ld_leh->leh_reccount == 1) {
2696                 rc = lfsck_namespace_dsd_single(env, com, child, pfid, &ldata,
2697                                                 &lh, &type, &retry);
2698                 if (retry) {
2699                         LASSERT(!lustre_handle_is_used(&lh));
2700
2701                         retry = false;
2702                         goto lock;
2703                 }
2704
2705                 GOTO(out, rc);
2706         }
2707
2708         if (!lustre_handle_is_used(&lh))
2709                 goto lock;
2710
2711         if (unlikely(ldata.ld_leh->leh_reccount == 0)) {
2712                 rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, &lh,
2713                                                 &type);
2714
2715                 GOTO(out, rc);
2716         }
2717
2718         /* When we come here, the cases usually like that:
2719          * 1) The directory object has a corrupted linkEA entry. During the
2720          *    first-stage scanning, the LFSCK cannot know such corruption,
2721          *    then it appends the right linkEA entry according to the found
2722          *    name entry after the bad one.
2723          *
2724          * 2) The directory object has a right linkEA entry. During the
2725          *    first-stage scanning, the LFSCK finds some bad name entry,
2726          *    but the LFSCK cannot aware that at that time, then it adds
2727          *    the bad linkEA entry for further processing. */
2728         rc = lfsck_namespace_dsd_multiple(env, com, child, pfid, &ldata,
2729                                           &lh, &type, lpf);
2730
2731         GOTO(out, rc);
2732
2733 out:
2734         lfsck_ibits_unlock(&lh, LCK_EX);
2735         if (rc > 0) {
2736                 switch (type) {
2737                 case LNIT_BAD_LINKEA:
2738                         ns->ln_linkea_repaired++;
2739                         break;
2740                 case LNIT_UNMATCHED_PAIRS:
2741                         ns->ln_unmatched_pairs_repaired++;
2742                         break;
2743                 case LNIT_MUL_REF:
2744                         ns->ln_mul_ref_repaired++;
2745                         break;
2746                 default:
2747                         break;
2748                 }
2749         }
2750
2751         if (unknown)
2752                 ns->ln_unknown_inconsistency++;
2753
2754         return rc;
2755 }
2756
2757 /**
2758  * Double scan the MDT-object for namespace LFSCK.
2759  *
2760  * If the MDT-object contains invalid or repeated linkEA entries, then drop
2761  * those entries from the linkEA; if the linkEA becomes empty or the object
2762  * has no linkEA, then it is an orphan and will be added into the directory
2763  * .lustre/lost+found/MDTxxxx/; if the remote parent is lost, then recreate
2764  * the remote parent; if the name entry corresponding to some linkEA entry
2765  * is lost, then add the name entry back to the namespace.
2766  *
2767  * \param[in] env       pointer to the thread context
2768  * \param[in] com       pointer to the lfsck component
2769  * \param[in] child     pointer to the dt_object to be handled
2770  * \param[in] flags     some hints to indicate how the @child should be handled
2771  *
2772  * \retval              positive number for repaired cases
2773  * \retval              0 if nothing to be repaired
2774  * \retval              negative error number on failure
2775  */
2776 static int lfsck_namespace_double_scan_one(const struct lu_env *env,
2777                                            struct lfsck_component *com,
2778                                            struct dt_object *child, __u8 flags)
2779 {
2780         struct lfsck_thread_info *info     = lfsck_env_info(env);
2781         struct lu_attr           *la       = &info->lti_la;
2782         struct lu_name           *cname    = &info->lti_name;
2783         struct lu_fid            *pfid     = &info->lti_fid;
2784         struct lu_fid            *cfid     = &info->lti_fid2;
2785         struct lfsck_instance    *lfsck    = com->lc_lfsck;
2786         struct lfsck_namespace   *ns       = com->lc_file_ram;
2787         struct dt_object         *parent   = NULL;
2788         struct linkea_data        ldata    = { 0 };
2789         bool                      repaired = false;
2790         int                       count    = 0;
2791         int                       rc;
2792         ENTRY;
2793
2794         dt_read_lock(env, child, 0);
2795         if (unlikely(lfsck_is_dead_obj(child))) {
2796                 dt_read_unlock(env, child);
2797
2798                 RETURN(0);
2799         }
2800
2801         if (S_ISDIR(lfsck_object_type(child))) {
2802                 dt_read_unlock(env, child);
2803                 rc = lfsck_namespace_double_scan_dir(env, com, child, flags);
2804
2805                 RETURN(rc);
2806         }
2807
2808         rc = lfsck_links_read(env, child, &ldata);
2809         dt_read_unlock(env, child);
2810         if (rc != 0)
2811                 GOTO(out, rc);
2812
2813         linkea_first_entry(&ldata);
2814         while (ldata.ld_lee != NULL) {
2815                 lfsck_namespace_unpack_linkea_entry(&ldata, cname, pfid,
2816                                                     info->lti_key);
2817                 rc = lfsck_namespace_filter_linkea_entry(&ldata, cname, pfid,
2818                                                          false);
2819                 /* Found repeated linkEA entries */
2820                 if (rc > 0) {
2821                         rc = lfsck_namespace_shrink_linkea(env, com, child,
2822                                                 &ldata, cname, pfid, false);
2823                         if (rc < 0)
2824                                 GOTO(out, rc);
2825
2826                         if (rc == 0)
2827                                 continue;
2828
2829                         repaired = true;
2830
2831                         /* fall through */
2832                 }
2833
2834                 /* Invalid PFID in the linkEA entry. */
2835                 if (!fid_is_sane(pfid)) {
2836                         rc = lfsck_namespace_shrink_linkea(env, com, child,
2837                                                 &ldata, cname, pfid, true);
2838                         if (rc < 0)
2839                                 GOTO(out, rc);
2840
2841                         if (rc > 0)
2842                                 repaired = true;
2843
2844                         continue;
2845                 }
2846
2847                 parent = lfsck_object_find_bottom(env, lfsck, pfid);
2848                 if (IS_ERR(parent))
2849                         GOTO(out, rc = PTR_ERR(parent));
2850
2851                 if (!dt_object_exists(parent)) {
2852
2853 lost_parent:
2854                         if (ldata.ld_leh->leh_reccount > 1) {
2855                                 /* If it is NOT the last linkEA entry, then
2856                                  * there is still other chance to make the
2857                                  * child to be visible via other parent, then
2858                                  * remove this linkEA entry. */
2859                                 rc = lfsck_namespace_shrink_linkea(env, com,
2860                                         child, &ldata, cname, pfid, true);
2861                         } else {
2862                                 /* Create the lost parent as an orphan. */
2863                                 rc = lfsck_namespace_create_orphan(env, com,
2864                                                                    parent);
2865                                 if (rc < 0) {
2866                                         lfsck_object_put(env, parent);
2867
2868                                         GOTO(out, rc);
2869                                 }
2870
2871                                 if (rc > 0)
2872                                         repaired = true;
2873
2874                                 /* Add the missing name entry to the parent. */
2875                                 rc = lfsck_namespace_insert_normal(env, com,
2876                                                 parent, child, cname->ln_name);
2877                                 if (unlikely(rc == -EEXIST))
2878                                         /* Unfortunately, someone reused the
2879                                          * name under the parent by race. So we
2880                                          * have to remove the linkEA entry from
2881                                          * current child object. It means that
2882                                          * the LFSCK cannot recover the system
2883                                          * totally back to its original status,
2884                                          * but it is necessary to make the
2885                                          * current system to be consistent. */
2886                                         rc = lfsck_namespace_shrink_linkea(env,
2887                                                         com, child, &ldata,
2888                                                         cname, pfid, true);
2889                                 else
2890                                         linkea_next_entry(&ldata);
2891                         }
2892
2893                         lfsck_object_put(env, parent);
2894                         if (rc < 0)
2895                                 GOTO(out, rc);
2896
2897                         if (rc > 0)
2898                                 repaired = true;
2899
2900                         continue;
2901                 }
2902
2903                 /* The linkEA entry with bad parent will be removed. */
2904                 if (unlikely(!dt_try_as_dir(env, parent))) {
2905                         lfsck_object_put(env, parent);
2906                         rc = lfsck_namespace_shrink_linkea(env, com, child,
2907                                                 &ldata, cname, pfid, true);
2908                         if (rc < 0)
2909                                 GOTO(out, rc);
2910
2911                         if (rc > 0)
2912                                 repaired = true;
2913
2914                         continue;
2915                 }
2916
2917                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
2918                                (const struct dt_key *)cname->ln_name,
2919                                BYPASS_CAPA);
2920                 if (rc != 0 && rc != -ENOENT) {
2921                         lfsck_object_put(env, parent);
2922
2923                         GOTO(out, rc);
2924                 }
2925
2926                 if (rc == 0) {
2927                         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
2928                                 /* It is the most common case that we
2929                                  * find the name entry corresponding
2930                                  * to the linkEA entry. */
2931                                 lfsck_object_put(env, parent);
2932                                 linkea_next_entry(&ldata);
2933                         } else {
2934                                 /* The name entry references another
2935                                  * MDT-object that may be created by
2936                                  * the LFSCK for repairing dangling
2937                                  * name entry. Try to replace it. */
2938                                 rc = lfsck_namespace_replace_cond(env, com,
2939                                                 parent, child, cfid, cname);
2940                                 lfsck_object_put(env, parent);
2941                                 if (rc < 0)
2942                                         GOTO(out, rc);
2943
2944                                 if (rc > 0) {
2945                                         repaired = true;
2946                                         linkea_next_entry(&ldata);
2947                                 } else {
2948                                         rc = lfsck_namespace_shrink_linkea(env,
2949                                                         com, child, &ldata,
2950                                                         cname, pfid, true);
2951                                         if (rc < 0)
2952                                                 GOTO(out, rc);
2953
2954                                         if (rc > 0)
2955                                                 repaired = true;
2956                                 }
2957                         }
2958
2959                         continue;
2960                 }
2961
2962                 rc = dt_attr_get(env, child, la, BYPASS_CAPA);
2963                 if (rc != 0)
2964                         GOTO(out, rc);
2965
2966                 /* If there is no name entry in the parent dir and the object
2967                  * link count is less than the linkea entries count, then the
2968                  * linkea entry should be removed. */
2969                 if (ldata.ld_leh->leh_reccount > la->la_nlink) {
2970                         rc = lfsck_namespace_shrink_linkea_cond(env, com,
2971                                         parent, child, &ldata, cname, pfid);
2972                         lfsck_object_put(env, parent);
2973                         if (rc < 0)
2974                                 GOTO(out, rc);
2975
2976                         if (rc > 0)
2977                                 repaired = true;
2978
2979                         continue;
2980                 }
2981
2982                 /* Add the missing name entry back to the namespace. */
2983                 rc = lfsck_namespace_insert_normal(env, com, parent, child,
2984                                                    cname->ln_name);
2985                 if (unlikely(rc == -ESTALE))
2986                         /* It may happen when the remote object has been
2987                          * removed, but the local MDT is not aware of that. */
2988                         goto lost_parent;
2989
2990                 if (unlikely(rc == -EEXIST))
2991                         /* Unfortunately, someone reused the name under the
2992                          * parent by race. So we have to remove the linkEA
2993                          * entry from current child object. It means that the
2994                          * LFSCK cannot recover the system totally back to
2995                          * its original status, but it is necessary to make
2996                          * the current system to be consistent.
2997                          *
2998                          * It also may be because of the LFSCK found some
2999                          * internal status of create operation. Under such
3000                          * case, nothing to be done. */
3001                         rc = lfsck_namespace_shrink_linkea_cond(env, com,
3002                                         parent, child, &ldata, cname, pfid);
3003                 else
3004                         linkea_next_entry(&ldata);
3005
3006                 lfsck_object_put(env, parent);
3007                 if (rc < 0)
3008                         GOTO(out, rc);
3009
3010                 if (rc > 0)
3011                         repaired = true;
3012         }
3013
3014         GOTO(out, rc = 0);
3015
3016 out:
3017         if (rc < 0 && rc != -ENODATA)
3018                 return rc;
3019
3020         if (rc == 0) {
3021                 LASSERT(ldata.ld_leh != NULL);
3022
3023                 count = ldata.ld_leh->leh_reccount;
3024         }
3025
3026         if (count == 0) {
3027                 /* If the child becomes orphan, then insert it into
3028                  * the global .lustre/lost+found/MDTxxxx directory. */
3029                 rc = lfsck_namespace_insert_orphan(env, com, child, "", "O",
3030                                                    &count);
3031                 if (rc < 0)
3032                         return rc;
3033
3034                 if (rc > 0) {
3035                         ns->ln_mul_ref_repaired++;
3036                         repaired = true;
3037                 }
3038         }
3039
3040         rc = dt_attr_get(env, child, la, BYPASS_CAPA);
3041         if (rc != 0)
3042                 return rc;
3043
3044         if (la->la_nlink != count) {
3045                 /* XXX: there will be other patch(es) for MDT-object
3046                  *      hard links verification. */
3047         }
3048
3049         if (repaired) {
3050                 if (la->la_nlink > 1)
3051                         ns->ln_mul_linked_repaired++;
3052
3053                 if (rc == 0)
3054                         rc = 1;
3055         }
3056
3057         return rc;
3058 }
3059
3060 static void lfsck_namespace_dump_statistics(struct seq_file *m,
3061                                             struct lfsck_namespace *ns,
3062                                             __u64 checked_phase1,
3063                                             __u64 checked_phase2,
3064                                             __u32 time_phase1,
3065                                             __u32 time_phase2)
3066 {
3067         seq_printf(m, "checked_phase1: "LPU64"\n"
3068                       "checked_phase2: "LPU64"\n"
3069                       "updated_phase1: "LPU64"\n"
3070                       "updated_phase2: "LPU64"\n"
3071                       "failed_phase1: "LPU64"\n"
3072                       "failed_phase2: "LPU64"\n"
3073                       "directories: "LPU64"\n"
3074                       "dirent_repaired: "LPU64"\n"
3075                       "linkea_repaired: "LPU64"\n"
3076                       "nlinks_repaired: "LPU64"\n"
3077                       "multiple_linked_checked: "LPU64"\n"
3078                       "multiple_linked_repaired: "LPU64"\n"
3079                       "unknown_inconsistency: "LPU64"\n"
3080                       "unmatched_pairs_repaired: "LPU64"\n"
3081                       "dangling_repaired: "LPU64"\n"
3082                       "multiple_referenced_repaired: "LPU64"\n"
3083                       "bad_file_type_repaired: "LPU64"\n"
3084                       "lost_dirent_repaired: "LPU64"\n"
3085                       "success_count: %u\n"
3086                       "run_time_phase1: %u seconds\n"
3087                       "run_time_phase2: %u seconds\n",
3088                       checked_phase1,
3089                       checked_phase2,
3090                       ns->ln_items_repaired,
3091                       ns->ln_objs_repaired_phase2,
3092                       ns->ln_items_failed,
3093                       ns->ln_objs_failed_phase2,
3094                       ns->ln_dirs_checked,
3095                       ns->ln_dirent_repaired,
3096                       ns->ln_linkea_repaired,
3097                       ns->ln_objs_nlink_repaired,
3098                       ns->ln_mul_linked_checked,
3099                       ns->ln_mul_linked_repaired,
3100                       ns->ln_unknown_inconsistency,
3101                       ns->ln_unmatched_pairs_repaired,
3102                       ns->ln_dangling_repaired,
3103                       ns->ln_mul_ref_repaired,
3104                       ns->ln_bad_type_repaired,
3105                       ns->ln_lost_dirent_repaired,
3106                       ns->ln_success_count,
3107                       time_phase1,
3108                       time_phase2);
3109 }
3110
3111 /* namespace APIs */
3112
3113 static int lfsck_namespace_reset(const struct lu_env *env,
3114                                  struct lfsck_component *com, bool init)
3115 {
3116         struct lfsck_instance   *lfsck = com->lc_lfsck;
3117         struct lfsck_namespace  *ns    = com->lc_file_ram;
3118         struct dt_object        *root;
3119         struct dt_object        *dto;
3120         int                      rc;
3121         ENTRY;
3122
3123         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
3124         if (IS_ERR(root))
3125                 GOTO(log, rc = PTR_ERR(root));
3126
3127         if (unlikely(!dt_try_as_dir(env, root)))
3128                 GOTO(put, rc = -ENOTDIR);
3129
3130         down_write(&com->lc_sem);
3131         if (init) {
3132                 memset(ns, 0, sizeof(*ns));
3133         } else {
3134                 __u32 count = ns->ln_success_count;
3135                 __u64 last_time = ns->ln_time_last_complete;
3136
3137                 memset(ns, 0, sizeof(*ns));
3138                 ns->ln_success_count = count;
3139                 ns->ln_time_last_complete = last_time;
3140         }
3141         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
3142         ns->ln_status = LS_INIT;
3143
3144         rc = local_object_unlink(env, lfsck->li_bottom, root,
3145                                  lfsck_namespace_name);
3146         if (rc != 0)
3147                 GOTO(out, rc);
3148
3149         lfsck_object_put(env, com->lc_obj);
3150         com->lc_obj = NULL;
3151         dto = local_index_find_or_create(env, lfsck->li_los, root,
3152                                          lfsck_namespace_name,
3153                                          S_IFREG | S_IRUGO | S_IWUSR,
3154                                          &dt_lfsck_features);
3155         if (IS_ERR(dto))
3156                 GOTO(out, rc = PTR_ERR(dto));
3157
3158         com->lc_obj = dto;
3159         rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
3160         if (rc != 0)
3161                 GOTO(out, rc);
3162
3163         rc = lfsck_namespace_store(env, com, true);
3164
3165         GOTO(out, rc);
3166
3167 out:
3168         up_write(&com->lc_sem);
3169
3170 put:
3171         lu_object_put(env, &root->do_lu);
3172 log:
3173         CDEBUG(D_LFSCK, "%s: namespace LFSCK reset: rc = %d\n",
3174                lfsck_lfsck2name(lfsck), rc);
3175         return rc;
3176 }
3177
3178 static void
3179 lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
3180                      bool new_checked)
3181 {
3182         struct lfsck_namespace *ns = com->lc_file_ram;
3183
3184         down_write(&com->lc_sem);
3185         if (new_checked)
3186                 com->lc_new_checked++;
3187         lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
3188         up_write(&com->lc_sem);
3189 }
3190
3191 static int lfsck_namespace_checkpoint(const struct lu_env *env,
3192                                       struct lfsck_component *com, bool init)
3193 {
3194         struct lfsck_instance   *lfsck = com->lc_lfsck;
3195         struct lfsck_namespace  *ns    = com->lc_file_ram;
3196         int                      rc;
3197
3198         if (!init) {
3199                 rc = lfsck_checkpoint_generic(env, com);
3200                 if (rc != 0)
3201                         goto log;
3202         }
3203
3204         down_write(&com->lc_sem);
3205         if (init) {
3206                 ns->ln_pos_latest_start = lfsck->li_pos_checkpoint;
3207         } else {
3208                 ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
3209                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
3210                                 HALF_SEC - lfsck->li_time_last_checkpoint);
3211                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
3212                 ns->ln_items_checked += com->lc_new_checked;
3213                 com->lc_new_checked = 0;
3214         }
3215
3216         rc = lfsck_namespace_store(env, com, false);
3217         up_write(&com->lc_sem);
3218
3219 log:
3220         CDEBUG(D_LFSCK, "%s: namespace LFSCK checkpoint at the pos ["LPU64
3221                ", "DFID", "LPX64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
3222                lfsck->li_pos_current.lp_oit_cookie,
3223                PFID(&lfsck->li_pos_current.lp_dir_parent),
3224                lfsck->li_pos_current.lp_dir_cookie, rc);
3225
3226         return rc > 0 ? 0 : rc;
3227 }
3228
3229 static int lfsck_namespace_prep(const struct lu_env *env,
3230                                 struct lfsck_component *com,
3231                                 struct lfsck_start_param *lsp)
3232 {
3233         struct lfsck_instance   *lfsck  = com->lc_lfsck;
3234         struct lfsck_namespace  *ns     = com->lc_file_ram;
3235         struct lfsck_position   *pos    = &com->lc_pos_start;
3236         int                      rc;
3237
3238         if (ns->ln_status == LS_COMPLETED) {
3239                 rc = lfsck_namespace_reset(env, com, false);
3240                 if (rc == 0)
3241                         rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
3242
3243                 if (rc != 0) {
3244                         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: "
3245                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
3246
3247                         return rc;
3248                 }
3249         }
3250
3251         down_write(&com->lc_sem);
3252         ns->ln_time_latest_start = cfs_time_current_sec();
3253         spin_lock(&lfsck->li_lock);
3254
3255         if (ns->ln_flags & LF_SCANNED_ONCE) {
3256                 if (!lfsck->li_drop_dryrun ||
3257                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
3258                         ns->ln_status = LS_SCANNING_PHASE2;
3259                         list_move_tail(&com->lc_link,
3260                                        &lfsck->li_list_double_scan);
3261                         if (!list_empty(&com->lc_link_dir))
3262                                 list_del_init(&com->lc_link_dir);
3263                         lfsck_pos_set_zero(pos);
3264                 } else {
3265                         ns->ln_status = LS_SCANNING_PHASE1;
3266                         ns->ln_run_time_phase1 = 0;
3267                         ns->ln_run_time_phase2 = 0;
3268                         ns->ln_items_checked = 0;
3269                         ns->ln_items_repaired = 0;
3270                         ns->ln_items_failed = 0;
3271                         ns->ln_dirs_checked = 0;
3272                         ns->ln_objs_checked_phase2 = 0;
3273                         ns->ln_objs_repaired_phase2 = 0;
3274                         ns->ln_objs_failed_phase2 = 0;
3275                         ns->ln_objs_nlink_repaired = 0;
3276                         ns->ln_dirent_repaired = 0;
3277                         ns->ln_linkea_repaired = 0;
3278                         ns->ln_mul_linked_checked = 0;
3279                         ns->ln_mul_linked_repaired = 0;
3280                         ns->ln_unknown_inconsistency = 0;
3281                         ns->ln_unmatched_pairs_repaired = 0;
3282                         ns->ln_dangling_repaired = 0;
3283                         ns->ln_mul_ref_repaired = 0;
3284                         ns->ln_bad_type_repaired = 0;
3285                         ns->ln_lost_dirent_repaired = 0;
3286                         fid_zero(&ns->ln_fid_latest_scanned_phase2);
3287                         if (list_empty(&com->lc_link_dir))
3288                                 list_add_tail(&com->lc_link_dir,
3289                                               &lfsck->li_list_dir);
3290                         *pos = ns->ln_pos_first_inconsistent;
3291                 }
3292         } else {
3293                 ns->ln_status = LS_SCANNING_PHASE1;
3294                 if (list_empty(&com->lc_link_dir))
3295                         list_add_tail(&com->lc_link_dir,
3296                                       &lfsck->li_list_dir);
3297                 if (!lfsck->li_drop_dryrun ||
3298                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
3299                         *pos = ns->ln_pos_last_checkpoint;
3300                         pos->lp_oit_cookie++;
3301                 } else {
3302                         *pos = ns->ln_pos_first_inconsistent;
3303                 }
3304         }
3305
3306         spin_unlock(&lfsck->li_lock);
3307         up_write(&com->lc_sem);
3308
3309         rc = lfsck_start_assistant(env, com, lsp);
3310
3311         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep done, start pos ["LPU64", "
3312                DFID", "LPX64"]: rc = %d\n",
3313                lfsck_lfsck2name(lfsck), pos->lp_oit_cookie,
3314                PFID(&pos->lp_dir_parent), pos->lp_dir_cookie, rc);
3315
3316         return rc;
3317 }
3318
3319 static int lfsck_namespace_exec_oit(const struct lu_env *env,
3320                                     struct lfsck_component *com,
3321                                     struct dt_object *obj)
3322 {
3323         struct lfsck_thread_info *info  = lfsck_env_info(env);
3324         struct lfsck_namespace   *ns    = com->lc_file_ram;
3325         struct lfsck_instance    *lfsck = com->lc_lfsck;
3326         const struct lu_fid      *fid   = lfsck_dto2fid(obj);
3327         struct lu_attr           *la    = &info->lti_la;
3328         struct lu_fid            *pfid  = &info->lti_fid2;
3329         struct lu_name           *cname = &info->lti_name;
3330         struct lu_seq_range      *range = &info->lti_range;
3331         struct dt_device         *dev   = lfsck->li_bottom;
3332         struct seq_server_site   *ss    =
3333                                 lu_site2seq(dev->dd_lu_dev.ld_site);
3334         struct linkea_data        ldata = { 0 };
3335         __u32                     idx   = lfsck_dev_idx(dev);
3336         int                       rc;
3337         ENTRY;
3338
3339         rc = lfsck_links_read(env, obj, &ldata);
3340         if (rc == -ENOENT)
3341                 GOTO(out, rc = 0);
3342
3343         /* -EINVAL means crashed linkEA, should be verified. */
3344         if (rc == -EINVAL) {
3345                 rc = lfsck_namespace_trace_update(env, com, fid,
3346                                                   LNTF_CHECK_LINKEA, true);
3347                 if (rc == 0) {
3348                         struct lustre_handle lh = { 0 };
3349
3350                         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
3351                                               MDS_INODELOCK_UPDATE |
3352                                               MDS_INODELOCK_XATTR, LCK_EX);
3353                         if (rc == 0) {
3354                                 rc = lfsck_namespace_links_remove(env, com,
3355                                                                   obj);
3356                                 lfsck_ibits_unlock(&lh, LCK_EX);
3357                         }
3358                 }
3359
3360                 GOTO(out, rc = (rc == -ENOENT ? 0 : rc));
3361         }
3362
3363         /* zero-linkEA object may be orphan, but it also maybe because
3364          * of upgrading. Currently, we cannot record it for double scan.
3365          * Because it may cause the LFSCK tracing file to be too large. */
3366         if (rc == -ENODATA) {
3367                 if (S_ISDIR(lfsck_object_type(obj)))
3368                         GOTO(out, rc = 0);
3369
3370                 rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
3371                 if (rc != 0)
3372                         GOTO(out, rc);
3373
3374                 if (la->la_nlink > 1)
3375                         rc = lfsck_namespace_trace_update(env, com, fid,
3376                                                 LNTF_CHECK_LINKEA, true);
3377
3378                 GOTO(out, rc);
3379         }
3380
3381         if (rc != 0)
3382                 GOTO(out, rc);
3383
3384         /* Record multiple-linked object. */
3385         if (ldata.ld_leh->leh_reccount > 1) {
3386                 rc = lfsck_namespace_trace_update(env, com, fid,
3387                                                   LNTF_CHECK_LINKEA, true);
3388
3389                 GOTO(out, rc);
3390         }
3391
3392         linkea_first_entry(&ldata);
3393         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
3394         if (!fid_is_sane(pfid)) {
3395                 rc = lfsck_namespace_trace_update(env, com, fid,
3396                                                   LNTF_CHECK_PARENT, true);
3397         } else {
3398                 fld_range_set_mdt(range);
3399                 rc = fld_local_lookup(env, ss->ss_server_fld,
3400                                       fid_seq(pfid), range);
3401                 if ((rc == -ENOENT) ||
3402                     (rc == 0 && range->lsr_index != idx)) {
3403                         rc = lfsck_namespace_trace_update(env, com, fid,
3404                                                 LNTF_CHECK_LINKEA, true);
3405                 } else {
3406                         if (S_ISDIR(lfsck_object_type(obj)))
3407                                 GOTO(out, rc = 0);
3408
3409                         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
3410                         if (rc != 0)
3411                                 GOTO(out, rc);
3412
3413                         if (la->la_nlink > 1)
3414                                 rc = lfsck_namespace_trace_update(env, com,
3415                                                 fid, LNTF_CHECK_LINKEA, true);
3416                 }
3417         }
3418
3419         GOTO(out, rc);
3420
3421 out:
3422         down_write(&com->lc_sem);
3423         com->lc_new_checked++;
3424         if (S_ISDIR(lfsck_object_type(obj)))
3425                 ns->ln_dirs_checked++;
3426         if (rc != 0)
3427                 lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
3428         up_write(&com->lc_sem);
3429
3430         return rc;
3431 }
3432
3433 static int lfsck_namespace_exec_dir(const struct lu_env *env,
3434                                     struct lfsck_component *com,
3435                                     struct lu_dirent *ent, __u16 type)
3436 {