Whamcloud - gitweb
LU-5516 lfsck: repair orphan parent MDT-object
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_namespace.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lustre/lustre_idl.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fid.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre/lustre_user.h>
41
42 #include "lfsck_internal.h"
43
44 #define LFSCK_NAMESPACE_MAGIC   0xA0629D03
45
46 enum lfsck_nameentry_check {
47         LFSCK_NAMEENTRY_DEAD            = 1, /* The object has been unlinked. */
48         LFSCK_NAMEENTRY_REMOVED         = 2, /* The entry has been removed. */
49         LFSCK_NAMEENTRY_RECREATED       = 3, /* The entry has been recreated. */
50 };
51
52 static const char lfsck_namespace_name[] = "lfsck_namespace";
53
54 static struct lfsck_namespace_req *
55 lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck,
56                                    struct lu_dirent *ent, __u16 type)
57 {
58         struct lfsck_namespace_req *lnr;
59         int                         size;
60
61         size = sizeof(*lnr) + (ent->lde_namelen & ~3) + 4;
62         OBD_ALLOC(lnr, size);
63         if (lnr == NULL)
64                 return ERR_PTR(-ENOMEM);
65
66         INIT_LIST_HEAD(&lnr->lnr_lar.lar_list);
67         lu_object_get(&lfsck->li_obj_dir->do_lu);
68         lnr->lnr_obj = lfsck->li_obj_dir;
69         lnr->lnr_fid = ent->lde_fid;
70         lnr->lnr_oit_cookie = lfsck->li_pos_current.lp_oit_cookie;
71         lnr->lnr_dir_cookie = ent->lde_hash;
72         lnr->lnr_attr = ent->lde_attrs;
73         lnr->lnr_size = size;
74         lnr->lnr_type = type;
75         lnr->lnr_namelen = ent->lde_namelen;
76         memcpy(lnr->lnr_name, ent->lde_name, ent->lde_namelen);
77
78         return lnr;
79 }
80
81 static void lfsck_namespace_assistant_req_fini(const struct lu_env *env,
82                                                struct lfsck_assistant_req *lar)
83 {
84         struct lfsck_namespace_req *lnr =
85                         container_of0(lar, struct lfsck_namespace_req, lnr_lar);
86
87         lu_object_put(env, &lnr->lnr_obj->do_lu);
88         OBD_FREE(lnr, lnr->lnr_size);
89 }
90
91 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
92                                       struct lfsck_namespace *src)
93 {
94         dst->ln_magic = le32_to_cpu(src->ln_magic);
95         dst->ln_status = le32_to_cpu(src->ln_status);
96         dst->ln_flags = le32_to_cpu(src->ln_flags);
97         dst->ln_success_count = le32_to_cpu(src->ln_success_count);
98         dst->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
99         dst->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
100         dst->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
101         dst->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
102         dst->ln_time_last_checkpoint =
103                                 le64_to_cpu(src->ln_time_last_checkpoint);
104         lfsck_position_le_to_cpu(&dst->ln_pos_latest_start,
105                                  &src->ln_pos_latest_start);
106         lfsck_position_le_to_cpu(&dst->ln_pos_last_checkpoint,
107                                  &src->ln_pos_last_checkpoint);
108         lfsck_position_le_to_cpu(&dst->ln_pos_first_inconsistent,
109                                  &src->ln_pos_first_inconsistent);
110         dst->ln_items_checked = le64_to_cpu(src->ln_items_checked);
111         dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
112         dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
113         dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
114         dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
115         dst->ln_objs_repaired_phase2 =
116                                 le64_to_cpu(src->ln_objs_repaired_phase2);
117         dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
118         dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
119         fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2,
120                       &src->ln_fid_latest_scanned_phase2);
121         dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
122         dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
123         dst->ln_mul_linked_checked = le64_to_cpu(src->ln_mul_linked_checked);
124         dst->ln_mul_linked_repaired = le64_to_cpu(src->ln_mul_linked_repaired);
125         dst->ln_unknown_inconsistency =
126                                 le64_to_cpu(src->ln_unknown_inconsistency);
127         dst->ln_unmatched_pairs_repaired =
128                                 le64_to_cpu(src->ln_unmatched_pairs_repaired);
129         dst->ln_dangling_repaired = le64_to_cpu(src->ln_dangling_repaired);
130         dst->ln_mul_ref_repaired = le64_to_cpu(src->ln_mul_ref_repaired);
131         dst->ln_bad_type_repaired = le64_to_cpu(src->ln_bad_type_repaired);
132         dst->ln_lost_dirent_repaired =
133                                 le64_to_cpu(src->ln_lost_dirent_repaired);
134 }
135
136 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
137                                       struct lfsck_namespace *src)
138 {
139         dst->ln_magic = cpu_to_le32(src->ln_magic);
140         dst->ln_status = cpu_to_le32(src->ln_status);
141         dst->ln_flags = cpu_to_le32(src->ln_flags);
142         dst->ln_success_count = cpu_to_le32(src->ln_success_count);
143         dst->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
144         dst->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
145         dst->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
146         dst->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
147         dst->ln_time_last_checkpoint =
148                                 cpu_to_le64(src->ln_time_last_checkpoint);
149         lfsck_position_cpu_to_le(&dst->ln_pos_latest_start,
150                                  &src->ln_pos_latest_start);
151         lfsck_position_cpu_to_le(&dst->ln_pos_last_checkpoint,
152                                  &src->ln_pos_last_checkpoint);
153         lfsck_position_cpu_to_le(&dst->ln_pos_first_inconsistent,
154                                  &src->ln_pos_first_inconsistent);
155         dst->ln_items_checked = cpu_to_le64(src->ln_items_checked);
156         dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
157         dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
158         dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
159         dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
160         dst->ln_objs_repaired_phase2 =
161                                 cpu_to_le64(src->ln_objs_repaired_phase2);
162         dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
163         dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
164         fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2,
165                       &src->ln_fid_latest_scanned_phase2);
166         dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
167         dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
168         dst->ln_mul_linked_checked = cpu_to_le64(src->ln_mul_linked_checked);
169         dst->ln_mul_linked_repaired = cpu_to_le64(src->ln_mul_linked_repaired);
170         dst->ln_unknown_inconsistency =
171                                 cpu_to_le64(src->ln_unknown_inconsistency);
172         dst->ln_unmatched_pairs_repaired =
173                                 cpu_to_le64(src->ln_unmatched_pairs_repaired);
174         dst->ln_dangling_repaired = cpu_to_le64(src->ln_dangling_repaired);
175         dst->ln_mul_ref_repaired = cpu_to_le64(src->ln_mul_ref_repaired);
176         dst->ln_bad_type_repaired = cpu_to_le64(src->ln_bad_type_repaired);
177         dst->ln_lost_dirent_repaired =
178                                 cpu_to_le64(src->ln_lost_dirent_repaired);
179 }
180
181 static void lfsck_namespace_record_failure(const struct lu_env *env,
182                                            struct lfsck_instance *lfsck,
183                                            struct lfsck_namespace *ns)
184 {
185         struct lfsck_position pos;
186
187         ns->ln_items_failed++;
188         lfsck_pos_fill(env, lfsck, &pos, false);
189         if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent) ||
190             lfsck_pos_is_eq(&pos, &ns->ln_pos_first_inconsistent) < 0) {
191                 ns->ln_pos_first_inconsistent = pos;
192
193                 CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
194                        "inconsistency at the pos ["LPU64", "DFID", "LPX64"]\n",
195                        lfsck_lfsck2name(lfsck),
196                        ns->ln_pos_first_inconsistent.lp_oit_cookie,
197                        PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
198                        ns->ln_pos_first_inconsistent.lp_dir_cookie);
199         }
200 }
201
202 /**
203  * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
204  * \retval 0: succeed.
205  * \retval -ve: failed cases.
206  */
207 static int lfsck_namespace_load(const struct lu_env *env,
208                                 struct lfsck_component *com)
209 {
210         int len = com->lc_file_size;
211         int rc;
212
213         rc = dt_xattr_get(env, com->lc_obj,
214                           lfsck_buf_get(env, com->lc_file_disk, len),
215                           XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
216         if (rc == len) {
217                 struct lfsck_namespace *ns = com->lc_file_ram;
218
219                 lfsck_namespace_le_to_cpu(ns,
220                                 (struct lfsck_namespace *)com->lc_file_disk);
221                 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
222                         CDEBUG(D_LFSCK, "%s: invalid lfsck_namespace magic "
223                                "%#x != %#x\n", lfsck_lfsck2name(com->lc_lfsck),
224                                ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
225                         rc = 1;
226                 } else {
227                         rc = 0;
228                 }
229         } else if (rc != -ENODATA) {
230                 CDEBUG(D_LFSCK, "%s: fail to load lfsck_namespace, "
231                        "expected = %d: rc = %d\n",
232                        lfsck_lfsck2name(com->lc_lfsck), len, rc);
233                 if (rc >= 0)
234                         rc = 1;
235         }
236         return rc;
237 }
238
239 static int lfsck_namespace_store(const struct lu_env *env,
240                                  struct lfsck_component *com, bool init)
241 {
242         struct dt_object        *obj    = com->lc_obj;
243         struct lfsck_instance   *lfsck  = com->lc_lfsck;
244         struct thandle          *handle;
245         int                      len    = com->lc_file_size;
246         int                      rc;
247         ENTRY;
248
249         lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
250                                   (struct lfsck_namespace *)com->lc_file_ram);
251         handle = dt_trans_create(env, lfsck->li_bottom);
252         if (IS_ERR(handle))
253                 GOTO(log, rc = PTR_ERR(handle));
254
255         rc = dt_declare_xattr_set(env, obj,
256                                   lfsck_buf_get(env, com->lc_file_disk, len),
257                                   XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
258         if (rc != 0)
259                 GOTO(out, rc);
260
261         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
262         if (rc != 0)
263                 GOTO(out, rc);
264
265         rc = dt_xattr_set(env, obj,
266                           lfsck_buf_get(env, com->lc_file_disk, len),
267                           XATTR_NAME_LFSCK_NAMESPACE,
268                           init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
269                           handle, BYPASS_CAPA);
270
271         GOTO(out, rc);
272
273 out:
274         dt_trans_stop(env, lfsck->li_bottom, handle);
275
276 log:
277         if (rc != 0)
278                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_namespace: rc = %d\n",
279                        lfsck_lfsck2name(lfsck), rc);
280         return rc;
281 }
282
283 static int lfsck_namespace_init(const struct lu_env *env,
284                                 struct lfsck_component *com)
285 {
286         struct lfsck_namespace *ns = com->lc_file_ram;
287         int rc;
288
289         memset(ns, 0, sizeof(*ns));
290         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
291         ns->ln_status = LS_INIT;
292         down_write(&com->lc_sem);
293         rc = lfsck_namespace_store(env, com, true);
294         up_write(&com->lc_sem);
295         return rc;
296 }
297
298 /**
299  * Update the namespace LFSCK tracing file for the given @fid
300  *
301  * \param[in] env       pointer to the thread context
302  * \param[in] com       pointer to the lfsck component
303  * \param[in] fid       the fid which flags to be updated in the lfsck
304  *                      tracing file
305  * \param[in] add       true if add new flags, otherwise remove flags
306  *
307  * \retval              0 for succeed or nothing to be done
308  * \retval              negative error number on failure
309  */
310 int lfsck_namespace_trace_update(const struct lu_env *env,
311                                  struct lfsck_component *com,
312                                  const struct lu_fid *fid,
313                                  const __u8 flags, bool add)
314 {
315         struct lfsck_instance   *lfsck  = com->lc_lfsck;
316         struct dt_object        *obj    = com->lc_obj;
317         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid3;
318         struct dt_device        *dev    = lfsck->li_bottom;
319         struct thandle          *th     = NULL;
320         int                      rc     = 0;
321         __u8                     old    = 0;
322         __u8                     new    = 0;
323         ENTRY;
324
325         LASSERT(flags != 0);
326
327         down_write(&com->lc_sem);
328         fid_cpu_to_be(key, fid);
329         rc = dt_lookup(env, obj, (struct dt_rec *)&old,
330                        (const struct dt_key *)key, BYPASS_CAPA);
331         if (rc == -ENOENT) {
332                 if (!add)
333                         GOTO(unlock, rc = 0);
334
335                 old = 0;
336                 new = flags;
337         } else if (rc == 0) {
338                 if (add) {
339                         if ((old & flags) == flags)
340                                 GOTO(unlock, rc = 0);
341
342                         new = old | flags;
343                 } else {
344                         if ((old & flags) == 0)
345                                 GOTO(unlock, rc = 0);
346
347                         new = old & ~flags;
348                 }
349         } else {
350                 GOTO(log, rc);
351         }
352
353         th = dt_trans_create(env, dev);
354         if (IS_ERR(th))
355                 GOTO(log, rc = PTR_ERR(th));
356
357         if (old != 0) {
358                 rc = dt_declare_delete(env, obj,
359                                        (const struct dt_key *)key, th);
360                 if (rc != 0)
361                         GOTO(log, rc);
362         }
363
364         if (new != 0) {
365                 rc = dt_declare_insert(env, obj,
366                                        (const struct dt_rec *)&new,
367                                        (const struct dt_key *)key, th);
368                 if (rc != 0)
369                         GOTO(log, rc);
370         }
371
372         rc = dt_trans_start_local(env, dev, th);
373         if (rc != 0)
374                 GOTO(log, rc);
375
376         if (old != 0) {
377                 rc = dt_delete(env, obj, (const struct dt_key *)key,
378                                th, BYPASS_CAPA);
379                 if (rc != 0)
380                         GOTO(log, rc);
381         }
382
383         if (new != 0) {
384                 rc = dt_insert(env, obj, (const struct dt_rec *)&new,
385                                (const struct dt_key *)key, th, BYPASS_CAPA, 1);
386                 if (rc != 0)
387                         GOTO(log, rc);
388         }
389
390         GOTO(log, rc);
391
392 log:
393         if (th != NULL && !IS_ERR(th))
394                 dt_trans_stop(env, dev, th);
395
396         CDEBUG(D_LFSCK, "%s: namespace LFSCK %s flags for "DFID" in the "
397                "tracing file, flags %x, old %x, new %x: rc = %d\n",
398                lfsck_lfsck2name(lfsck), add ? "add" : "del", PFID(fid),
399                (__u32)flags, (__u32)old, (__u32)new, rc);
400
401 unlock:
402         up_write(&com->lc_sem);
403
404         return rc;
405 }
406
407 static int lfsck_namespace_check_exist(const struct lu_env *env,
408                                        struct dt_object *dir,
409                                        struct dt_object *obj, const char *name)
410 {
411         struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
412         int               rc;
413         ENTRY;
414
415         if (unlikely(lfsck_is_dead_obj(obj)))
416                 RETURN(LFSCK_NAMEENTRY_DEAD);
417
418         rc = dt_lookup(env, dir, (struct dt_rec *)fid,
419                        (const struct dt_key *)name, BYPASS_CAPA);
420         if (rc == -ENOENT)
421                 RETURN(LFSCK_NAMEENTRY_REMOVED);
422
423         if (rc < 0)
424                 RETURN(rc);
425
426         if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
427                 RETURN(LFSCK_NAMEENTRY_RECREATED);
428
429         RETURN(0);
430 }
431
432 static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
433                                             struct dt_object *obj,
434                                             struct thandle *handle)
435 {
436         int rc;
437
438         /* For destroying all invalid linkEA entries. */
439         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
440         if (rc != 0)
441                 return rc;
442
443         /* For insert new linkEA entry. */
444         rc = dt_declare_xattr_set(env, obj,
445                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
446                         XATTR_NAME_LINK, 0, handle);
447         return rc;
448 }
449
450 int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
451                        struct linkea_data *ldata)
452 {
453         int rc;
454
455         if (ldata->ld_buf->lb_buf == NULL)
456                 return -ENOMEM;
457
458         if (!dt_object_exists(obj))
459                 return -ENOENT;
460
461         rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
462         if (rc == -ERANGE) {
463                 /* Buf was too small, figure out what we need. */
464                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LINK,
465                                   BYPASS_CAPA);
466                 if (rc <= 0)
467                         return rc;
468
469                 lu_buf_realloc(ldata->ld_buf, rc);
470                 if (ldata->ld_buf->lb_buf == NULL)
471                         return -ENOMEM;
472
473                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
474                                   BYPASS_CAPA);
475         }
476
477         if (rc > 0)
478                 rc = linkea_init(ldata);
479
480         return rc;
481 }
482
483 /**
484  * Remove linkEA for the given object.
485  *
486  * The caller should take the ldlm lock before the calling.
487  *
488  * \param[in] env       pointer to the thread context
489  * \param[in] com       pointer to the lfsck component
490  * \param[in] obj       pointer to the dt_object to be handled
491  *
492  * \retval              0 for repaired cases
493  * \retval              negative error number on failure
494  */
495 static int lfsck_namespace_links_remove(const struct lu_env *env,
496                                         struct lfsck_component *com,
497                                         struct dt_object *obj)
498 {
499         struct lfsck_instance           *lfsck  = com->lc_lfsck;
500         struct dt_device                *dev    = lfsck->li_bottom;
501         struct thandle                  *th     = NULL;
502         int                              rc     = 0;
503         ENTRY;
504
505         LASSERT(dt_object_remote(obj) == 0);
506
507         th = dt_trans_create(env, dev);
508         if (IS_ERR(th))
509                 GOTO(log, rc = PTR_ERR(th));
510
511         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, th);
512         if (rc != 0)
513                 GOTO(stop, rc);
514
515         rc = dt_trans_start_local(env, dev, th);
516         if (rc != 0)
517                 GOTO(stop, rc);
518
519         dt_write_lock(env, obj, 0);
520         if (unlikely(lfsck_is_dead_obj(obj)))
521                 GOTO(unlock, rc = -ENOENT);
522
523         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
524                 GOTO(unlock, rc = 0);
525
526         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th, BYPASS_CAPA);
527
528         GOTO(unlock, rc);
529
530 unlock:
531         dt_write_unlock(env, obj);
532
533 stop:
534         dt_trans_stop(env, dev, th);
535
536 log:
537         CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
538                "for the object "DFID": rc = %d\n",
539                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
540
541         if (rc == 0) {
542                 struct lfsck_namespace *ns = com->lc_file_ram;
543
544                 ns->ln_flags |= LF_INCONSISTENT;
545         }
546
547         return rc;
548 }
549
550 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
551                              struct linkea_data *ldata, struct thandle *handle)
552 {
553         const struct lu_buf *buf = lfsck_buf_get_const(env,
554                                                        ldata->ld_buf->lb_buf,
555                                                        ldata->ld_leh->leh_len);
556
557         return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle,
558                             BYPASS_CAPA);
559 }
560
561 static void lfsck_namespace_unpack_linkea_entry(struct linkea_data *ldata,
562                                                 struct lu_name *cname,
563                                                 struct lu_fid *pfid,
564                                                 char *buf)
565 {
566         linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
567         /* To guarantee the 'name' is terminated with '0'. */
568         memcpy(buf, cname->ln_name, cname->ln_namelen);
569         buf[cname->ln_namelen] = 0;
570         cname->ln_name = buf;
571 }
572
573 static int lfsck_namespace_filter_linkea_entry(struct linkea_data *ldata,
574                                                struct lu_name *cname,
575                                                struct lu_fid *pfid,
576                                                bool remove)
577 {
578         struct link_ea_entry    *oldlee;
579         int                      oldlen;
580         int                      repeated = 0;
581
582         oldlee = ldata->ld_lee;
583         oldlen = ldata->ld_reclen;
584         linkea_next_entry(ldata);
585         while (ldata->ld_lee != NULL) {
586                 ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
587                                    ldata->ld_lee->lee_reclen[1];
588                 if (unlikely(ldata->ld_reclen == oldlen &&
589                              memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
590                         repeated++;
591                         if (!remove)
592                                 break;
593
594                         linkea_del_buf(ldata, cname);
595                 } else {
596                         linkea_next_entry(ldata);
597                 }
598         }
599         ldata->ld_lee = oldlee;
600         ldata->ld_reclen = oldlen;
601
602         return repeated;
603 }
604
605 /**
606  * Insert orphan into .lustre/lost+found/MDTxxxx/ locally.
607  *
608  * Add the specified orphan MDT-object to the .lustre/lost+found/MDTxxxx/
609  * with the given type to generate the name, the detailed rules for name
610  * have been described as following.
611  *
612  * The function also generates the linkEA corresponding to the name entry
613  * under the .lustre/lost+found/MDTxxxx/ for the orphan MDT-object.
614  *
615  * \param[in] env       pointer to the thread context
616  * \param[in] com       pointer to the lfsck component
617  * \param[in] orphan    pointer to the orphan MDT-object
618  * \param[in] infix     additional information for the orphan name, such as
619  *                      the FID for original
620  * \param[in] type      the type for describing why the orphan MDT-object is
621  *                      created. The rules are as following:
622  *
623  *  type "D":           The MDT-object is a directory, it may knows its parent
624  *                      but because there is no valid linkEA, the LFSCK cannot
625  *                      know where to put it back to the namespace.
626  *  type "O":           The MDT-object has no linkEA, and there is no name
627  *                      entry that references the MDT-object.
628  *
629  * \see lfsck_layout_recreate_parent() for more types.
630  *
631  * The orphan name will be like:
632  * ${FID}-${infix}-${type}-${conflict_version}
633  *
634  * \param[out] count    if some others inserted some linkEA entries by race,
635  *                      then return the linkEA entries count.
636  *
637  * \retval              positive number for repaired cases
638  * \retval              0 if needs to repair nothing
639  * \retval              negative error number on failure
640  */
641 static int lfsck_namespace_insert_orphan(const struct lu_env *env,
642                                          struct lfsck_component *com,
643                                          struct dt_object *orphan,
644                                          const char *infix, const char *type,
645                                          int *count)
646 {
647         struct lfsck_thread_info        *info   = lfsck_env_info(env);
648         struct lu_name                  *cname  = &info->lti_name;
649         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
650         struct lu_fid                   *tfid   = &info->lti_fid5;
651         const struct lu_fid             *cfid   = lfsck_dto2fid(orphan);
652         const struct lu_fid             *pfid;
653         struct lfsck_instance           *lfsck  = com->lc_lfsck;
654         struct dt_device                *dev    = lfsck->li_bottom;
655         struct dt_object                *parent;
656         struct thandle                  *th     = NULL;
657         struct lustre_handle             plh    = { 0 };
658         struct lustre_handle             clh    = { 0 };
659         struct linkea_data               ldata  = { 0 };
660         struct lu_buf                    linkea_buf;
661         int                              namelen;
662         int                              idx    = 0;
663         int                              rc     = 0;
664         bool                             exist  = false;
665         ENTRY;
666
667         cname->ln_name = NULL;
668         /* Create .lustre/lost+found/MDTxxxx when needed. */
669         if (unlikely(lfsck->li_lpf_obj == NULL)) {
670                 rc = lfsck_create_lpf(env, lfsck);
671                 if (rc != 0)
672                         GOTO(log, rc);
673         }
674
675         parent = lfsck->li_lpf_obj;
676         pfid = lfsck_dto2fid(parent);
677
678         /* Hold update lock on the parent to prevent others to access. */
679         rc = lfsck_ibits_lock(env, lfsck, parent, &plh,
680                               MDS_INODELOCK_UPDATE, LCK_EX);
681         if (rc != 0)
682                 GOTO(log, rc);
683
684         do {
685                 namelen = snprintf(info->lti_key, NAME_MAX, DFID"%s-%s-%d",
686                                    PFID(cfid), infix, type, idx++);
687                 rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
688                                (const struct dt_key *)info->lti_key,
689                                BYPASS_CAPA);
690                 if (rc != 0 && rc != -ENOENT)
691                         GOTO(log, rc);
692
693                 if (unlikely(rc == 0 && lu_fid_eq(cfid, tfid)))
694                         exist = true;
695         } while (rc == 0 && !exist);
696
697         cname->ln_name = info->lti_key;
698         cname->ln_namelen = namelen;
699         rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
700         if (rc != 0)
701                 GOTO(log, rc);
702
703         rc = linkea_add_buf(&ldata, cname, pfid);
704         if (rc != 0)
705                 GOTO(log, rc);
706
707         rc = lfsck_ibits_lock(env, lfsck, orphan, &clh,
708                               MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP,
709                               LCK_EX);
710         if (rc != 0)
711                 GOTO(log, rc);
712
713         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
714                        ldata.ld_leh->leh_len);
715         th = dt_trans_create(env, dev);
716         if (IS_ERR(th))
717                 GOTO(log, rc = PTR_ERR(th));
718
719         if (S_ISDIR(lfsck_object_type(orphan))) {
720                 rc = dt_declare_delete(env, orphan,
721                                        (const struct dt_key *)dotdot, th);
722                 if (rc != 0)
723                         GOTO(stop, rc);
724
725                 rec->rec_type = S_IFDIR;
726                 rec->rec_fid = pfid;
727                 rc = dt_declare_insert(env, orphan, (const struct dt_rec *)rec,
728                                        (const struct dt_key *)dotdot, th);
729                 if (rc != 0)
730                         GOTO(stop, rc);
731         }
732
733         rc = dt_declare_xattr_set(env, orphan, &linkea_buf,
734                                   XATTR_NAME_LINK, 0, th);
735         if (rc != 0)
736                 GOTO(stop, rc);
737
738         if (!exist) {
739                 rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
740                 rec->rec_fid = cfid;
741                 rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
742                                        (const struct dt_key *)cname->ln_name,
743                                        th);
744                 if (rc != 0)
745                         GOTO(stop, rc);
746
747                 if (S_ISDIR(rec->rec_type)) {
748                         rc = dt_declare_ref_add(env, parent, th);
749                         if (rc != 0)
750                                 GOTO(stop, rc);
751                 }
752         }
753
754         rc = dt_trans_start_local(env, dev, th);
755         if (rc != 0)
756                 GOTO(stop, rc);
757
758         dt_write_lock(env, orphan, 0);
759         rc = lfsck_links_read(env, orphan, &ldata);
760         if (likely((rc == -ENODATA) || (rc == -EINVAL) ||
761                    (rc == 0 && ldata.ld_leh->leh_reccount == 0))) {
762                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
763                         GOTO(unlock, rc = 1);
764
765                 if (S_ISDIR(lfsck_object_type(orphan))) {
766                         rc = dt_delete(env, orphan,
767                                        (const struct dt_key *)dotdot, th,
768                                        BYPASS_CAPA);
769                         if (rc != 0)
770                                 GOTO(unlock, rc);
771
772                         rec->rec_type = S_IFDIR;
773                         rec->rec_fid = pfid;
774                         rc = dt_insert(env, orphan, (const struct dt_rec *)rec,
775                                        (const struct dt_key *)dotdot, th,
776                                        BYPASS_CAPA, 1);
777                         if (rc != 0)
778                                 GOTO(unlock, rc);
779                 }
780
781                 rc = dt_xattr_set(env, orphan, &linkea_buf, XATTR_NAME_LINK, 0,
782                                   th, BYPASS_CAPA);
783         } else {
784                 if (rc == 0 && count != NULL)
785                         *count = ldata.ld_leh->leh_reccount;
786
787                 GOTO(unlock, rc);
788         }
789         dt_write_unlock(env, orphan);
790
791         if (rc == 0 && !exist) {
792                 rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
793                 rec->rec_fid = cfid;
794                 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
795                                (const struct dt_key *)cname->ln_name,
796                                th, BYPASS_CAPA, 1);
797                 if (rc == 0 && S_ISDIR(rec->rec_type)) {
798                         dt_write_lock(env, parent, 0);
799                         rc = dt_ref_add(env, parent, th);
800                         dt_write_unlock(env, parent);
801                 }
802         }
803
804         GOTO(stop, rc = (rc == 0 ? 1 : rc));
805
806 unlock:
807         dt_write_unlock(env, orphan);
808
809 stop:
810         dt_trans_stop(env, dev, th);
811
812 log:
813         lfsck_ibits_unlock(&clh, LCK_EX);
814         lfsck_ibits_unlock(&plh, LCK_EX);
815         CDEBUG(D_LFSCK, "%s: namespace LFSCK insert orphan for the "
816                "object "DFID", name = %s: rc = %d\n",
817                lfsck_lfsck2name(lfsck), PFID(cfid),
818                cname->ln_name != NULL ? cname->ln_name : "<NULL>", rc);
819
820         if (rc != 0) {
821                 struct lfsck_namespace *ns = com->lc_file_ram;
822
823                 ns->ln_flags |= LF_INCONSISTENT;
824         }
825
826         return rc;
827 }
828
829 /**
830  * Add the specified name entry back to namespace.
831  *
832  * If there is a linkEA entry that back references a name entry under
833  * some parent directory, but such parent directory does not have the
834  * claimed name entry. On the other hand, the linkEA entries count is
835  * not larger than the MDT-object's hard link count. Under such case,
836  * it is quite possible that the name entry is lost. Then the LFSCK
837  * should add the name entry back to the namespace.
838  *
839  * \param[in] env       pointer to the thread context
840  * \param[in] com       pointer to the lfsck component
841  * \param[in] parent    pointer to the directory under which the name entry
842  *                      will be inserted into
843  * \param[in] child     pointer to the object referenced by the name entry
844  *                      that to be inserted into the parent
845  * \param[in] name      the name for the child in the parent directory
846  *
847  * \retval              positive number for repaired cases
848  * \retval              0 if nothing to be repaired
849  * \retval              negative error number on failure
850  */
851 static int lfsck_namespace_insert_normal(const struct lu_env *env,
852                                          struct lfsck_component *com,
853                                          struct dt_object *parent,
854                                          struct dt_object *child,
855                                          const char *name)
856 {
857         struct lfsck_thread_info        *info   = lfsck_env_info(env);
858         struct lu_attr                  *la     = &info->lti_la;
859         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
860         struct lfsck_instance           *lfsck  = com->lc_lfsck;
861         struct dt_device                *dev    = lfsck->li_next;
862         struct thandle                  *th     = NULL;
863         struct lustre_handle             lh     = { 0 };
864         int                              rc     = 0;
865         ENTRY;
866
867         if (unlikely(!dt_try_as_dir(env, parent)))
868                 GOTO(log, rc = -ENOTDIR);
869
870         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
871                 GOTO(log, rc = 1);
872
873         /* Hold update lock on the parent to prevent others to access. */
874         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
875                               MDS_INODELOCK_UPDATE, LCK_EX);
876         if (rc != 0)
877                 GOTO(log, rc);
878
879         th = dt_trans_create(env, dev);
880         if (IS_ERR(th))
881                 GOTO(unlock, rc = PTR_ERR(th));
882
883         rec->rec_type = lfsck_object_type(child) & S_IFMT;
884         rec->rec_fid = lfsck_dto2fid(child);
885         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
886                                (const struct dt_key *)name, th);
887         if (rc != 0)
888                 GOTO(stop, rc);
889
890         if (S_ISDIR(rec->rec_type)) {
891                 rc = dt_declare_ref_add(env, parent, th);
892                 if (rc != 0)
893                         GOTO(stop, rc);
894         }
895
896         memset(la, 0, sizeof(*la));
897         la->la_ctime = cfs_time_current_sec();
898         la->la_valid = LA_CTIME;
899         rc = dt_declare_attr_set(env, parent, la, th);
900         if (rc != 0)
901                 GOTO(stop, rc);
902
903         rc = dt_trans_start_local(env, dev, th);
904         if (rc != 0)
905                 GOTO(stop, rc);
906
907         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
908                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
909         if (rc != 0)
910                 GOTO(stop, rc);
911
912         if (S_ISDIR(rec->rec_type)) {
913                 dt_write_lock(env, parent, 0);
914                 rc = dt_ref_add(env, parent, th);
915                 dt_write_unlock(env, parent);
916                 if (rc != 0)
917                         GOTO(stop, rc);
918         }
919
920         la->la_ctime = cfs_time_current_sec();
921         rc = dt_attr_set(env, parent, la, th, BYPASS_CAPA);
922
923         GOTO(stop, rc = (rc == 0 ? 1 : rc));
924
925 stop:
926         dt_trans_stop(env, dev, th);
927
928 unlock:
929         lfsck_ibits_unlock(&lh, LCK_EX);
930
931 log:
932         CDEBUG(D_LFSCK, "%s: namespace LFSCK insert object "DFID" with "
933                "the name %s and type %o to the parent "DFID": rc = %d\n",
934                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), name,
935                lfsck_object_type(child) & S_IFMT,
936                PFID(lfsck_dto2fid(parent)), rc);
937
938         if (rc != 0) {
939                 struct lfsck_namespace *ns = com->lc_file_ram;
940
941                 ns->ln_flags |= LF_INCONSISTENT;
942                 if (rc > 0)
943                         ns->ln_lost_dirent_repaired++;
944         }
945
946         return rc;
947 }
948
949 /**
950  * Create the specified orphan MDT-object on remote MDT.
951  *
952  * The LFSCK instance on this MDT will send LFSCK RPC to remote MDT to
953  * ask the remote LFSCK instance to create the specified orphan object
954  * under .lustre/lost+found/MDTxxxx/ directory with the name:
955  * ${FID}-P-${conflict_version}.
956  *
957  * \param[in] env       pointer to the thread context
958  * \param[in] com       pointer to the lfsck component
959  * \param[in] orphan    pointer to the orphan MDT-object
960  * \param[in] type      the orphan's type to be created
961  *
962  *  type "P":           The orphan object to be created was a parent directory
963  *                      of some DMT-object which linkEA shows that the @orphan
964  *                      object is missing.
965  *
966  * \see lfsck_layout_recreate_parent() for more types.
967  *
968  * \retval              positive number for repaired cases
969  * \retval              0 if needs to repair nothing
970  * \retval              negative error number on failure
971  */
972 static int lfsck_namespace_create_orphan_remote(const struct lu_env *env,
973                                                 struct lfsck_component *com,
974                                                 struct dt_object *orphan,
975                                                 __u32 type)
976 {
977         struct lfsck_thread_info        *info   = lfsck_env_info(env);
978         struct lfsck_request            *lr     = &info->lti_lr;
979         struct lu_seq_range             *range  = &info->lti_range;
980         const struct lu_fid             *fid    = lfsck_dto2fid(orphan);
981         struct lfsck_namespace          *ns     = com->lc_file_ram;
982         struct lfsck_instance           *lfsck  = com->lc_lfsck;
983         struct seq_server_site          *ss     =
984                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
985         struct lfsck_tgt_desc           *ltd    = NULL;
986         struct ptlrpc_request           *req    = NULL;
987         int                              rc;
988         ENTRY;
989
990         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
991                 GOTO(out, rc = 1);
992
993         fld_range_set_mdt(range);
994         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
995         if (rc != 0)
996                 GOTO(out, rc);
997
998         ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, range->lsr_index);
999         if (ltd == NULL) {
1000                 ns->ln_flags |= LF_INCOMPLETE;
1001
1002                 GOTO(out, rc = -ENODEV);
1003         }
1004
1005         req = ptlrpc_request_alloc(class_exp2cliimp(ltd->ltd_exp),
1006                                    &RQF_LFSCK_NOTIFY);
1007         if (req == NULL)
1008                 GOTO(out, rc = -ENOMEM);
1009
1010         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
1011         if (rc != 0) {
1012                 ptlrpc_request_free(req);
1013
1014                 GOTO(out, rc);
1015         }
1016
1017         lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1018         memset(lr, 0, sizeof(*lr));
1019         lr->lr_event = LE_CREATE_ORPHAN;
1020         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1021         lr->lr_active = LFSCK_TYPE_NAMESPACE;
1022         lr->lr_fid = *fid;
1023         lr->lr_type = type;
1024
1025         ptlrpc_request_set_replen(req);
1026         rc = ptlrpc_queue_wait(req);
1027         ptlrpc_req_finished(req);
1028
1029         if (rc == 0)
1030                 rc = 1;
1031         else if (rc == -EEXIST)
1032                 rc = 0;
1033
1034         GOTO(out, rc);
1035
1036 out:
1037         CDEBUG(D_LFSCK, "%s: namespace LFSCK create object "
1038                DFID" on the MDT %x remotely: rc = %d\n",
1039                lfsck_lfsck2name(lfsck), PFID(fid),
1040                ltd != NULL ? ltd->ltd_index : -1, rc);
1041
1042         if (ltd != NULL)
1043                 lfsck_tgt_put(ltd);
1044
1045         return rc;
1046 }
1047
1048 /**
1049  * Create the specified orphan MDT-object locally.
1050  *
1051  * For the case that the parent MDT-object stored in some MDT-object's
1052  * linkEA entry is lost, the LFSCK will re-create the parent object as
1053  * an orphan and insert it into .lustre/lost+found/MDTxxxx/ directory
1054  * with the name ${FID}-P-${conflict_version}.
1055  *
1056  * \param[in] env       pointer to the thread context
1057  * \param[in] com       pointer to the lfsck component
1058  * \param[in] orphan    pointer to the orphan MDT-object to be created
1059  * \param[in] type      the orphan's type to be created
1060  *
1061  *  type "P":           The orphan object to be created was a parent directory
1062  *                      of some DMT-object which linkEA shows that the @orphan
1063  *                      object is missing.
1064  *
1065  * \see lfsck_layout_recreate_parent() for more types.
1066  *
1067  * \retval              positive number for repaired cases
1068  * \retval              negative error number on failure
1069  */
1070 static int lfsck_namespace_create_orphan_local(const struct lu_env *env,
1071                                                struct lfsck_component *com,
1072                                                struct dt_object *orphan,
1073                                                __u32 type)
1074 {
1075         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1076         struct lu_attr                  *la     = &info->lti_la;
1077         struct dt_allocation_hint       *hint   = &info->lti_hint;
1078         struct dt_object_format         *dof    = &info->lti_dof;
1079         struct lu_name                  *cname  = &info->lti_name2;
1080         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1081         struct lu_fid                   *tfid   = &info->lti_fid;
1082         const struct lu_fid             *cfid   = lfsck_dto2fid(orphan);
1083         const struct lu_fid             *pfid;
1084         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1085         struct dt_device                *dev    = lfsck->li_bottom;
1086         struct dt_object                *parent = NULL;
1087         struct dt_object                *child  = NULL;
1088         struct thandle                  *th     = NULL;
1089         struct lustre_handle             lh     = { 0 };
1090         struct linkea_data               ldata  = { 0 };
1091         struct lu_buf                    linkea_buf;
1092         char                             name[32];
1093         int                              namelen;
1094         int                              idx    = 0;
1095         int                              rc     = 0;
1096         ENTRY;
1097
1098         LASSERT(!dt_object_exists(orphan));
1099         LASSERT(!dt_object_remote(orphan));
1100
1101         /* @orphan maybe not attached to lfsck->li_bottom */
1102         child = lfsck_object_find_by_dev(env, dev, cfid);
1103         if (IS_ERR(child))
1104                 GOTO(log, rc = PTR_ERR(child));
1105
1106         cname->ln_name = NULL;
1107         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1108                 GOTO(log, rc = 1);
1109
1110         /* Create .lustre/lost+found/MDTxxxx when needed. */
1111         if (unlikely(lfsck->li_lpf_obj == NULL)) {
1112                 rc = lfsck_create_lpf(env, lfsck);
1113                 if (rc != 0)
1114                         GOTO(log, rc);
1115         }
1116
1117         parent = lfsck->li_lpf_obj;
1118         pfid = lfsck_dto2fid(parent);
1119
1120         /* Hold update lock on the parent to prevent others to access. */
1121         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
1122                               MDS_INODELOCK_UPDATE, LCK_EX);
1123         if (rc != 0)
1124                 GOTO(log, rc);
1125
1126         do {
1127                 namelen = snprintf(name, 31, DFID"-P-%d",
1128                                    PFID(cfid), idx++);
1129                 rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
1130                                (const struct dt_key *)name, BYPASS_CAPA);
1131                 if (rc != 0 && rc != -ENOENT)
1132                         GOTO(unlock1, rc);
1133         } while (rc == 0);
1134
1135         cname->ln_name = name;
1136         cname->ln_namelen = namelen;
1137
1138         memset(la, 0, sizeof(*la));
1139         la->la_mode = type | (S_ISDIR(type) ? 0700 : 0600);
1140         la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
1141                        LA_ATIME | LA_MTIME | LA_CTIME;
1142
1143         child->do_ops->do_ah_init(env, hint, parent, child,
1144                                   la->la_mode & S_IFMT);
1145
1146         memset(dof, 0, sizeof(*dof));
1147         dof->dof_type = dt_mode_to_dft(type);
1148
1149         rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
1150         if (rc != 0)
1151                 GOTO(unlock1, rc);
1152
1153         rc = linkea_add_buf(&ldata, cname, pfid);
1154         if (rc != 0)
1155                 GOTO(unlock1, rc);
1156
1157         th = dt_trans_create(env, dev);
1158         if (IS_ERR(th))
1159                 GOTO(unlock1, rc = PTR_ERR(th));
1160
1161         rc = dt_declare_create(env, child, la, hint, dof, th);
1162         if (rc == 0 && S_ISDIR(type))
1163                 rc = dt_declare_ref_add(env, child, th);
1164
1165         if (rc != 0)
1166                 GOTO(stop, rc);
1167
1168         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
1169                        ldata.ld_leh->leh_len);
1170         rc = dt_declare_xattr_set(env, child, &linkea_buf,
1171                                   XATTR_NAME_LINK, 0, th);
1172         if (rc != 0)
1173                 GOTO(stop, rc);
1174
1175         rec->rec_type = type;
1176         rec->rec_fid = cfid;
1177         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1178                                (const struct dt_key *)name, th);
1179         if (rc == 0 && S_ISDIR(type))
1180                 rc = dt_declare_ref_add(env, parent, th);
1181
1182         if (rc != 0)
1183                 GOTO(stop, rc);
1184
1185         rc = dt_trans_start_local(env, dev, th);
1186         if (rc != 0)
1187                 GOTO(stop, rc);
1188
1189         dt_write_lock(env, child, 0);
1190         rc = dt_create(env, child, la, hint, dof, th);
1191         if (rc != 0)
1192                 GOTO(unlock2, rc);
1193
1194         if (S_ISDIR(type)) {
1195                 if (unlikely(!dt_try_as_dir(env, child)))
1196                         GOTO(unlock2, rc = -ENOTDIR);
1197
1198                 rec->rec_type = S_IFDIR;
1199                 rec->rec_fid = cfid;
1200                 rc = dt_insert(env, child, (const struct dt_rec *)rec,
1201                                (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
1202                 if (rc != 0)
1203                         GOTO(unlock2, rc);
1204
1205                 rec->rec_fid = pfid;
1206                 rc = dt_insert(env, child, (const struct dt_rec *)rec,
1207                                (const struct dt_key *)dotdot, th,
1208                                BYPASS_CAPA, 1);
1209                 if (rc != 0)
1210                         GOTO(unlock2, rc);
1211
1212                 rc = dt_ref_add(env, child, th);
1213                 if (rc != 0)
1214                         GOTO(unlock2, rc);
1215         }
1216
1217         rc = dt_xattr_set(env, child, &linkea_buf,
1218                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1219         dt_write_unlock(env, child);
1220         if (rc != 0)
1221                 GOTO(stop, rc);
1222
1223         rec->rec_type = type;
1224         rec->rec_fid = cfid;
1225         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1226                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
1227         if (rc == 0 && S_ISDIR(type)) {
1228                 dt_write_lock(env, parent, 0);
1229                 rc = dt_ref_add(env, parent, th);
1230                 dt_write_unlock(env, parent);
1231         }
1232
1233         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1234
1235 unlock2:
1236         dt_write_unlock(env, child);
1237
1238 stop:
1239         dt_trans_stop(env, dev, th);
1240
1241 unlock1:
1242         lfsck_ibits_unlock(&lh, LCK_EX);
1243
1244 log:
1245         CDEBUG(D_LFSCK, "%s: namespace LFSCK create orphan locally for "
1246                "the object "DFID", name = %s, type %o: rc = %d\n",
1247                lfsck_lfsck2name(lfsck), PFID(cfid),
1248                cname->ln_name != NULL ? cname->ln_name : "<NULL>", type, rc);
1249
1250         if (child != NULL && !IS_ERR(child))
1251                 lfsck_object_put(env, child);
1252
1253         return rc;
1254 }
1255
1256 /**
1257  * Create the specified orphan MDT-object.
1258  *
1259  * For the case that the parent MDT-object stored in some MDT-object's
1260  * linkEA entry is lost, the LFSCK will re-create the parent object as
1261  * an orphan and insert it into .lustre/lost+found/MDTxxxx/ directory
1262  * with the name: ${FID}-P-${conflict_version}.
1263  *
1264  * \param[in] env       pointer to the thread context
1265  * \param[in] com       pointer to the lfsck component
1266  * \param[in] orphan    pointer to the orphan MDT-object
1267  *
1268  *  type "P":           The orphan object to be created was a parent directory
1269  *                      of some DMT-object which linkEA shows that the @orphan
1270  *                      object is missing.
1271  *
1272  * \see lfsck_layout_recreate_parent() for more types.
1273  *
1274  * \retval              positive number for repaired cases
1275  * \retval              0 if needs to repair nothing
1276  * \retval              negative error number on failure
1277  */
1278 static int lfsck_namespace_create_orphan(const struct lu_env *env,
1279                                          struct lfsck_component *com,
1280                                          struct dt_object *orphan)
1281 {
1282         struct lfsck_namespace *ns = com->lc_file_ram;
1283         int                     rc;
1284
1285         if (dt_object_remote(orphan))
1286                 rc = lfsck_namespace_create_orphan_remote(env, com, orphan,
1287                                                           S_IFDIR);
1288         else
1289                 rc = lfsck_namespace_create_orphan_local(env, com, orphan,
1290                                                          S_IFDIR);
1291
1292         if (rc != 0)
1293                 ns->ln_flags |= LF_INCONSISTENT;
1294
1295         return rc;
1296 }
1297
1298 /**
1299  * Remove the specified entry from the linkEA.
1300  *
1301  * Locate the linkEA entry with the given @cname and @pfid, then
1302  * remove this entry or the other entries those are repeated with
1303  * this entry.
1304  *
1305  * \param[in] env       pointer to the thread context
1306  * \param[in] com       pointer to the lfsck component
1307  * \param[in] obj       pointer to the dt_object to be handled
1308  * \param[in,out]ldata  pointer to the buffer that holds the linkEA
1309  * \param[in] cname     the name for the child in the parent directory
1310  * \param[in] pfid      the parent directory's FID for the linkEA
1311  * \param[in] next      if true, then remove the first found linkEA
1312  *                      entry, and move the ldata->ld_lee to next entry
1313  *
1314  * \retval              positive number for repaired cases
1315  * \retval              0 if nothing to be repaired
1316  * \retval              negative error number on failure
1317  */
1318 static int lfsck_namespace_shrink_linkea(const struct lu_env *env,
1319                                          struct lfsck_component *com,
1320                                          struct dt_object *obj,
1321                                          struct linkea_data *ldata,
1322                                          struct lu_name *cname,
1323                                          struct lu_fid *pfid,
1324                                          bool next)
1325 {
1326         struct lfsck_instance           *lfsck     = com->lc_lfsck;
1327         struct dt_device                *dev       = lfsck->li_bottom;
1328         struct lfsck_bookmark           *bk        = &lfsck->li_bookmark_ram;
1329         struct thandle                  *th        = NULL;
1330         struct lustre_handle             lh        = { 0 };
1331         struct linkea_data               ldata_new = { 0 };
1332         struct lu_buf                    linkea_buf;
1333         int                              rc        = 0;
1334         ENTRY;
1335
1336         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
1337                               MDS_INODELOCK_UPDATE |
1338                               MDS_INODELOCK_XATTR, LCK_EX);
1339         if (rc != 0)
1340                 GOTO(log, rc);
1341
1342         if (next)
1343                 linkea_del_buf(ldata, cname);
1344         else
1345                 lfsck_namespace_filter_linkea_entry(ldata, cname, pfid,
1346                                                     true);
1347         lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
1348                        ldata->ld_leh->leh_len);
1349
1350 again:
1351         th = dt_trans_create(env, dev);
1352         if (IS_ERR(th))
1353                 GOTO(unlock1, rc = PTR_ERR(th));
1354
1355         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1356                                   XATTR_NAME_LINK, 0, th);
1357         if (rc != 0)
1358                 GOTO(stop, rc);
1359
1360         rc = dt_trans_start_local(env, dev, th);
1361         if (rc != 0)
1362                 GOTO(stop, rc);
1363
1364         dt_write_lock(env, obj, 0);
1365         if (unlikely(lfsck_is_dead_obj(obj)))
1366                 GOTO(unlock2, rc = -ENOENT);
1367
1368         rc = lfsck_links_read2(env, obj, &ldata_new);
1369         if (rc != 0)
1370                 GOTO(unlock2, rc);
1371
1372         /* The specified linkEA entry has been removed by race. */
1373         rc = linkea_links_find(&ldata_new, cname, pfid);
1374         if (rc != 0)
1375                 GOTO(unlock2, rc = 0);
1376
1377         if (bk->lb_param & LPF_DRYRUN)
1378                 GOTO(unlock2, rc = 1);
1379
1380         if (next)
1381                 linkea_del_buf(&ldata_new, cname);
1382         else
1383                 lfsck_namespace_filter_linkea_entry(&ldata_new, cname, pfid,
1384                                                     true);
1385
1386         if (linkea_buf.lb_len < ldata_new.ld_leh->leh_len) {
1387                 dt_write_unlock(env, obj);
1388                 dt_trans_stop(env, dev, th);
1389                 lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
1390                                ldata_new.ld_leh->leh_len);
1391                 goto again;
1392         }
1393
1394         lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
1395                        ldata_new.ld_leh->leh_len);
1396         rc = dt_xattr_set(env, obj, &linkea_buf,
1397                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1398
1399         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
1400
1401 unlock2:
1402         dt_write_unlock(env, obj);
1403
1404 stop:
1405         dt_trans_stop(env, dev, th);
1406
1407 unlock1:
1408         lfsck_ibits_unlock(&lh, LCK_EX);
1409
1410 log:
1411         CDEBUG(D_LFSCK, "%s: namespace LFSCK remove %s linkEA entry "
1412                "for the object: "DFID", parent "DFID", name %.*s\n",
1413                lfsck_lfsck2name(lfsck), next ? "invalid" : "redundant",
1414                PFID(lfsck_dto2fid(obj)), PFID(pfid), cname->ln_namelen,
1415                cname->ln_name);
1416
1417         if (rc != 0) {
1418                 struct lfsck_namespace *ns = com->lc_file_ram;
1419
1420                 ns->ln_flags |= LF_INCONSISTENT;
1421         }
1422
1423         return rc;
1424 }
1425
1426 /**
1427  * Conditionally remove the specified entry from the linkEA.
1428  *
1429  * Take the parent lock firstly, then check whether the specified
1430  * name entry exists or not: if yes, do nothing; otherwise, call
1431  * lfsck_namespace_shrink_linkea() to remove the linkea entry.
1432  *
1433  * \param[in] env       pointer to the thread context
1434  * \param[in] com       pointer to the lfsck component
1435  * \param[in] parent    pointer to the parent directory
1436  * \param[in] child     pointer to the child object that holds the linkEA
1437  * \param[in,out]ldata  pointer to the buffer that holds the linkEA
1438  * \param[in] cname     the name for the child in the parent directory
1439  * \param[in] pfid      the parent directory's FID for the linkEA
1440  *
1441  * \retval              positive number for repaired cases
1442  * \retval              0 if nothing to be repaired
1443  * \retval              negative error number on failure
1444  */
1445 static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env,
1446                                               struct lfsck_component *com,
1447                                               struct dt_object *parent,
1448                                               struct dt_object *child,
1449                                               struct linkea_data *ldata,
1450                                               struct lu_name *cname,
1451                                               struct lu_fid *pfid)
1452 {
1453         struct lu_fid           *cfid   = &lfsck_env_info(env)->lti_fid3;
1454         struct lustre_handle     lh     = { 0 };
1455         int                      rc;
1456         ENTRY;
1457
1458         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
1459                               MDS_INODELOCK_UPDATE, LCK_EX);
1460         if (rc != 0)
1461                 RETURN(rc);
1462
1463         dt_read_lock(env, parent, 0);
1464         if (unlikely(lfsck_is_dead_obj(parent))) {
1465                 dt_read_unlock(env, parent);
1466                 lfsck_ibits_unlock(&lh, LCK_EX);
1467                 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
1468                                                    cname, pfid, true);
1469
1470                 RETURN(rc);
1471         }
1472
1473         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1474                        (const struct dt_key *)cname->ln_name,
1475                        BYPASS_CAPA);
1476         dt_read_unlock(env, parent);
1477
1478         /* It is safe to release the ldlm lock, because when the logic come
1479          * here, we have got all the needed information above whether the
1480          * linkEA entry is valid or not. It is not important that others
1481          * may add new linkEA entry after the ldlm lock released. If other
1482          * has removed the specified linkEA entry by race, then it is OK,
1483          * because the subsequent lfsck_namespace_shrink_linkea() can handle
1484          * such case. */
1485         lfsck_ibits_unlock(&lh, LCK_EX);
1486         if (rc == -ENOENT) {
1487                 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
1488                                                    cname, pfid, true);
1489
1490                 RETURN(rc);
1491         }
1492
1493         if (rc != 0)
1494                 RETURN(rc);
1495
1496         /* The LFSCK just found some internal status of cross-MDTs
1497          * create operation. That is normal. */
1498         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
1499                 linkea_next_entry(ldata);
1500
1501                 RETURN(0);
1502         }
1503
1504         rc = lfsck_namespace_shrink_linkea(env, com, child, ldata, cname,
1505                                            pfid, true);
1506
1507         RETURN(rc);
1508 }
1509
1510 /**
1511  * Conditionally replace name entry in the parent.
1512  *
1513  * As required, the LFSCK may re-create the lost MDT-object for dangling
1514  * name entry, but such repairing may be wrong because of bad FID in the
1515  * name entry. As the LFSCK processing, the real MDT-object may be found,
1516  * then the LFSCK should check whether the former re-created MDT-object
1517  * has been modified or not, if not, then destroy it and update the name
1518  * entry in the parent to reference the real MDT-object.
1519  *
1520  * \param[in] env       pointer to the thread context
1521  * \param[in] com       pointer to the lfsck component
1522  * \param[in] parent    pointer to the parent directory
1523  * \param[in] child     pointer to the MDT-object that may be the real
1524  *                      MDT-object corresponding to the name entry in parent
1525  * \param[in] cfid      the current FID in the name entry
1526  * \param[in] cname     contains the name of the child in the parent directory
1527  *
1528  * \retval              positive number for repaired cases
1529  * \retval              0 if nothing to be repaired
1530  * \retval              negative error number on failure
1531  */
1532 static int lfsck_namespace_replace_cond(const struct lu_env *env,
1533                                         struct lfsck_component *com,
1534                                         struct dt_object *parent,
1535                                         struct dt_object *child,
1536                                         const struct lu_fid *cfid,
1537                                         const struct lu_name *cname)
1538 {
1539         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1540         struct lu_fid                   *tfid   = &info->lti_fid5;
1541         struct lu_attr                  *la     = &info->lti_la;
1542         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1543         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1544         struct dt_device                *dev    = lfsck->li_next;
1545         const char                      *name   = cname->ln_name;
1546         struct dt_object                *obj    = NULL;
1547         struct lustre_handle             plh    = { 0 };
1548         struct lustre_handle             clh    = { 0 };
1549         struct linkea_data               ldata  = { 0 };
1550         struct thandle                  *th     = NULL;
1551         bool                             exist  = true;
1552         int                              rc     = 0;
1553         ENTRY;
1554
1555         rc = lfsck_ibits_lock(env, lfsck, parent, &plh,
1556                               MDS_INODELOCK_UPDATE, LCK_EX);
1557         if (rc != 0)
1558                 GOTO(log, rc);
1559
1560         if (!fid_is_sane(cfid)) {
1561                 exist = false;
1562                 goto replace;
1563         }
1564
1565         obj = lfsck_object_find(env, lfsck, cfid);
1566         if (IS_ERR(obj)) {
1567                 rc = PTR_ERR(obj);
1568                 if (rc == -ENOENT) {
1569                         exist = false;
1570                         goto replace;
1571                 }
1572
1573                 GOTO(log, rc);
1574         }
1575
1576         if (!dt_object_exists(obj)) {
1577                 exist = false;
1578                 goto replace;
1579         }
1580
1581         rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
1582                        (const struct dt_key *)name, BYPASS_CAPA);
1583         if (rc == -ENOENT) {
1584                 exist = false;
1585                 goto replace;
1586         }
1587
1588         if (rc != 0)
1589                 GOTO(log, rc);
1590
1591         /* Someone changed the name entry, cannot replace it. */
1592         if (!lu_fid_eq(cfid, tfid))
1593                 GOTO(log, rc = 0);
1594
1595         /* lock the object to be destroyed. */
1596         rc = lfsck_ibits_lock(env, lfsck, obj, &clh,
1597                               MDS_INODELOCK_UPDATE |
1598                               MDS_INODELOCK_XATTR, LCK_EX);
1599         if (rc != 0)
1600                 GOTO(log, rc);
1601
1602         if (unlikely(lfsck_is_dead_obj(obj))) {
1603                 exist = false;
1604                 goto replace;
1605         }
1606
1607         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
1608         if (rc != 0)
1609                 GOTO(log, rc);
1610
1611         /* The object has been modified by other(s), or it is not created by
1612          * LFSCK, the two cases are indistinguishable. So cannot replace it. */
1613         if (la->la_ctime != 0)
1614                 GOTO(log, rc);
1615
1616         if (S_ISREG(la->la_mode)) {
1617                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
1618                                   BYPASS_CAPA);
1619                 /* If someone has created related OST-object(s),
1620                  * then keep it. */
1621                 if ((rc > 0) || (rc < 0 && rc != -ENODATA))
1622                         GOTO(log, rc = (rc > 0 ? 0 : rc));
1623         }
1624
1625 replace:
1626         dt_read_lock(env, child, 0);
1627         rc = lfsck_links_read2(env, child, &ldata);
1628         dt_read_unlock(env, child);
1629
1630         /* Someone changed the child, no need to replace. */
1631         if (rc == -ENODATA)
1632                 GOTO(log, rc = 0);
1633
1634         if (rc != 0)
1635                 GOTO(log, rc);
1636
1637         rc = linkea_links_find(&ldata, cname, lfsck_dto2fid(parent));
1638         /* Someone moved the child, no need to replace. */
1639         if (rc != 0)
1640                 GOTO(log, rc = 0);
1641
1642         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1643                 GOTO(log, rc = 1);
1644
1645         th = dt_trans_create(env, dev);
1646         if (IS_ERR(th))
1647                 GOTO(log, rc = PTR_ERR(th));
1648
1649         if (exist) {
1650                 rc = dt_declare_destroy(env, obj, th);
1651                 if (rc != 0)
1652                         GOTO(stop, rc);
1653         }
1654
1655         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
1656         if (rc != 0)
1657                 GOTO(stop, rc);
1658
1659         rec->rec_type = S_IFDIR;
1660         rec->rec_fid = lfsck_dto2fid(child);
1661         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1662                                (const struct dt_key *)name, th);
1663         if (rc != 0)
1664                 GOTO(stop, rc);
1665
1666         rc = dt_trans_start(env, dev, th);
1667         if (rc != 0)
1668                 GOTO(stop, rc);
1669
1670         if (exist) {
1671                 rc = dt_destroy(env, obj, th);
1672                 if (rc != 0)
1673                         GOTO(stop, rc);
1674         }
1675
1676         /* The old name entry maybe not exist. */
1677         dt_delete(env, parent, (const struct dt_key *)name, th,
1678                   BYPASS_CAPA);
1679
1680         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1681                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
1682
1683         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1684
1685 stop:
1686         dt_trans_stop(env, dev, th);
1687
1688 log:
1689         lfsck_ibits_unlock(&clh, LCK_EX);
1690         lfsck_ibits_unlock(&plh, LCK_EX);
1691         if (obj != NULL && !IS_ERR(obj))
1692                 lfsck_object_put(env, obj);
1693
1694         CDEBUG(D_LFSCK, "%s: namespace LFSCK conditionally destroy the "
1695                "object "DFID" because of conflict with the object "DFID
1696                " under the parent "DFID" with name %s: rc = %d\n",
1697                lfsck_lfsck2name(lfsck), PFID(cfid),
1698                PFID(lfsck_dto2fid(child)), PFID(lfsck_dto2fid(parent)),
1699                name, rc);
1700
1701         return rc;
1702 }
1703
1704 /**
1705  * Overwrite the linkEA for the object with the given ldata.
1706  *
1707  * The caller should take the ldlm lock before the calling.
1708  *
1709  * \param[in] env       pointer to the thread context
1710  * \param[in] com       pointer to the lfsck component
1711  * \param[in] obj       pointer to the dt_object to be handled
1712  * \param[in] ldata     pointer to the new linkEA data
1713  *
1714  * \retval              positive number for repaired cases
1715  * \retval              0 if nothing to be repaired
1716  * \retval              negative error number on failure
1717  */
1718 int lfsck_namespace_rebuild_linkea(const struct lu_env *env,
1719                                    struct lfsck_component *com,
1720                                    struct dt_object *obj,
1721                                    struct linkea_data *ldata)
1722 {
1723         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1724         struct dt_device                *dev    = lfsck->li_bottom;
1725         struct thandle                  *th     = NULL;
1726         struct lu_buf                    linkea_buf;
1727         int                              rc     = 0;
1728         ENTRY;
1729
1730         LASSERT(!dt_object_remote(obj));
1731
1732         th = dt_trans_create(env, dev);
1733         if (IS_ERR(th))
1734                 GOTO(log, rc = PTR_ERR(th));
1735
1736         lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
1737                        ldata->ld_leh->leh_len);
1738         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1739                                   XATTR_NAME_LINK, 0, th);
1740         if (rc != 0)
1741                 GOTO(stop, rc);
1742
1743         rc = dt_trans_start_local(env, dev, th);
1744         if (rc != 0)
1745                 GOTO(stop, rc);
1746
1747         dt_write_lock(env, obj, 0);
1748         if (unlikely(lfsck_is_dead_obj(obj)))
1749                 GOTO(unlock, rc = 0);
1750
1751         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1752                 GOTO(unlock, rc = 1);
1753
1754         rc = dt_xattr_set(env, obj, &linkea_buf,
1755                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1756
1757         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
1758
1759 unlock:
1760         dt_write_unlock(env, obj);
1761
1762 stop:
1763         dt_trans_stop(env, dev, th);
1764
1765 log:
1766         CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild linkEA for the "
1767                "object "DFID": rc = %d\n",
1768                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
1769
1770         if (rc != 0) {
1771                 struct lfsck_namespace *ns = com->lc_file_ram;
1772
1773                 ns->ln_flags |= LF_INCONSISTENT;
1774         }
1775
1776         return rc;
1777 }
1778
1779 /**
1780  * Repair invalid name entry.
1781  *
1782  * If the name entry contains invalid information, such as bad file type
1783  * or (and) corrupted object FID, then either remove the name entry or
1784  * udpate the name entry with the given (right) information.
1785  *
1786  * \param[in] env       pointer to the thread context
1787  * \param[in] com       pointer to the lfsck component
1788  * \param[in] parent    pointer to the parent directory
1789  * \param[in] child     pointer to the object referenced by the name entry
1790  * \param[in] name      the old name of the child under the parent directory
1791  * \param[in] name2     the new name of the child under the parent directory
1792  * \param[in] type      the type claimed by the name entry
1793  * \param[in] update    update the name entry if true; otherwise, remove it
1794  * \param[in] dec       decrease the parent nlink count if true
1795  *
1796  * \retval              positive number for repaired successfully
1797  * \retval              0 if nothing to be repaired
1798  * \retval              negative error number on failure
1799  */
1800 int lfsck_namespace_repair_dirent(const struct lu_env *env,
1801                                   struct lfsck_component *com,
1802                                   struct dt_object *parent,
1803                                   struct dt_object *child,
1804                                   const char *name, const char *name2,
1805                                   __u16 type, bool update, bool dec)
1806 {
1807         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1808         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1809         const struct lu_fid             *cfid   = lfsck_dto2fid(child);
1810         struct lu_fid                   *tfid   = &info->lti_fid5;
1811         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1812         struct dt_device                *dev    = lfsck->li_next;
1813         struct thandle                  *th     = NULL;
1814         struct lustre_handle             lh     = { 0 };
1815         int                              rc     = 0;
1816         ENTRY;
1817
1818         if (unlikely(!dt_try_as_dir(env, parent)))
1819                 GOTO(log, rc = -ENOTDIR);
1820
1821         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
1822                               MDS_INODELOCK_UPDATE, LCK_EX);
1823         if (rc != 0)
1824                 GOTO(log, rc);
1825
1826         th = dt_trans_create(env, dev);
1827         if (IS_ERR(th))
1828                 GOTO(unlock1, rc = PTR_ERR(th));
1829
1830         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
1831         if (rc != 0)
1832                 GOTO(stop, rc);
1833
1834         if (update) {
1835                 rec->rec_type = lfsck_object_type(child) & S_IFMT;
1836                 rec->rec_fid = cfid;
1837                 rc = dt_declare_insert(env, parent,
1838                                        (const struct dt_rec *)rec,
1839                                        (const struct dt_key *)name2, th);
1840                 if (rc != 0)
1841                         GOTO(stop, rc);
1842         }
1843
1844         if (dec) {
1845                 rc = dt_declare_ref_del(env, parent, th);
1846                 if (rc != 0)
1847                         GOTO(stop, rc);
1848         }
1849
1850         rc = dt_trans_start(env, dev, th);
1851         if (rc != 0)
1852                 GOTO(stop, rc);
1853
1854         dt_write_lock(env, parent, 0);
1855         rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
1856                        (const struct dt_key *)name, BYPASS_CAPA);
1857         /* Someone has removed the bad name entry by race. */
1858         if (rc == -ENOENT)
1859                 GOTO(unlock2, rc = 0);
1860
1861         if (rc != 0)
1862                 GOTO(unlock2, rc);
1863
1864         /* Someone has removed the bad name entry and reused it for other
1865          * object by race. */
1866         if (!lu_fid_eq(tfid, cfid))
1867                 GOTO(unlock2, rc = 0);
1868
1869         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1870                 GOTO(unlock2, rc = 1);
1871
1872         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
1873                        BYPASS_CAPA);
1874         if (rc != 0)
1875                 GOTO(unlock2, rc);
1876
1877         if (update) {
1878                 rc = dt_insert(env, parent,
1879                                (const struct dt_rec *)rec,
1880                                (const struct dt_key *)name2, th,
1881                                BYPASS_CAPA, 1);
1882                 if (rc != 0)
1883                         GOTO(unlock2, rc);
1884         }
1885
1886         if (dec) {
1887                 rc = dt_ref_del(env, parent, th);
1888                 if (rc != 0)
1889                         GOTO(unlock2, rc);
1890         }
1891
1892         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
1893
1894 unlock2:
1895         dt_write_unlock(env, parent);
1896
1897 stop:
1898         dt_trans_stop(env, dev, th);
1899
1900         /* We are not sure whether the child will become orphan or not.
1901          * Record it in the LFSCK tracing file for further checking in
1902          * the second-stage scanning. */
1903         if (!update && !dec && rc == 0)
1904                 lfsck_namespace_trace_update(env, com, cfid,
1905                                              LNTF_CHECK_LINKEA, true);
1906
1907 unlock1:
1908         lfsck_ibits_unlock(&lh, LCK_EX);
1909
1910 log:
1911         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found bad name "
1912                "entry for: parent "DFID", child "DFID", name %s, type "
1913                "in name entry %o, type claimed by child %o. repair it "
1914                "by %s with new name2 %s: rc = %d\n", lfsck_lfsck2name(lfsck),
1915                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
1916                name, type, update ? lfsck_object_type(child) : 0,
1917                update ? "updating" : "removing", name2, rc);
1918
1919         if (rc != 0) {
1920                 struct lfsck_namespace *ns = com->lc_file_ram;
1921
1922                 ns->ln_flags |= LF_INCONSISTENT;
1923         }
1924
1925         return rc;
1926 }
1927
1928 /**
1929  * Update the ".." name entry for the given object.
1930  *
1931  * The object's ".." is corrupted, this function will update the ".." name
1932  * entry with the given pfid, and the linkEA with the given ldata.
1933  *
1934  * The caller should take the ldlm lock before the calling.
1935  *
1936  * \param[in] env       pointer to the thread context
1937  * \param[in] com       pointer to the lfsck component
1938  * \param[in] obj       pointer to the dt_object to be handled
1939  * \param[in] pfid      the new fid for the object's ".." name entry
1940  * \param[in] cname     the name for the @obj in the parent directory
1941  *
1942  * \retval              positive number for repaired cases
1943  * \retval              0 if nothing to be repaired
1944  * \retval              negative error number on failure
1945  */
1946 static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env,
1947                                                   struct lfsck_component *com,
1948                                                   struct dt_object *obj,
1949                                                   const struct lu_fid *pfid,
1950                                                   struct lu_name *cname)
1951 {
1952         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1953         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1954         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1955         struct dt_device                *dev    = lfsck->li_bottom;
1956         struct thandle                  *th     = NULL;
1957         struct linkea_data               ldata  = { 0 };
1958         struct lu_buf                    linkea_buf;
1959         int                              rc     = 0;
1960         ENTRY;
1961
1962         LASSERT(!dt_object_remote(obj));
1963         LASSERT(S_ISDIR(lfsck_object_type(obj)));
1964
1965         rc = linkea_data_new(&ldata, &info->lti_big_buf);
1966         if (rc != 0)
1967                 GOTO(log, rc);
1968
1969         rc = linkea_add_buf(&ldata, cname, pfid);
1970         if (rc != 0)
1971                 GOTO(log, rc);
1972
1973         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
1974                        ldata.ld_leh->leh_len);
1975
1976         th = dt_trans_create(env, dev);
1977         if (IS_ERR(th))
1978                 GOTO(log, rc = PTR_ERR(th));
1979
1980         rc = dt_declare_delete(env, obj, (const struct dt_key *)dotdot, th);
1981         if (rc != 0)
1982                 GOTO(stop, rc);
1983
1984         rec->rec_type = S_IFDIR;
1985         rec->rec_fid = pfid;
1986         rc = dt_declare_insert(env, obj, (const struct dt_rec *)rec,
1987                                (const struct dt_key *)dotdot, th);
1988         if (rc != 0)
1989                 GOTO(stop, rc);
1990
1991         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1992                                   XATTR_NAME_LINK, 0, th);
1993         if (rc != 0)
1994                 GOTO(stop, rc);
1995
1996         rc = dt_trans_start_local(env, dev, th);
1997         if (rc != 0)
1998                 GOTO(stop, rc);
1999
2000         dt_write_lock(env, obj, 0);
2001         if (unlikely(lfsck_is_dead_obj(obj)))
2002                 GOTO(unlock, rc = 0);
2003
2004         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
2005                 GOTO(unlock, rc = 1);
2006
2007         /* The old ".." name entry maybe not exist. */
2008         dt_delete(env, obj, (const struct dt_key *)dotdot, th,
2009                   BYPASS_CAPA);
2010
2011         rc = dt_insert(env, obj, (const struct dt_rec *)rec,
2012                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
2013         if (rc != 0)
2014                 GOTO(unlock, rc);
2015
2016         rc = dt_xattr_set(env, obj, &linkea_buf,
2017                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
2018
2019         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
2020
2021 unlock:
2022         dt_write_unlock(env, obj);
2023
2024 stop:
2025         dt_trans_stop(env, dev, th);
2026
2027 log:
2028         CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild dotdot name entry for "
2029                "the object "DFID", new parent "DFID": rc = %d\n",
2030                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)),
2031                PFID(pfid), rc);
2032
2033         if (rc != 0) {
2034                 struct lfsck_namespace *ns = com->lc_file_ram;
2035
2036                 ns->ln_flags |= LF_INCONSISTENT;
2037         }
2038
2039         return rc;
2040 }
2041
2042 /**
2043  * Handle orphan @obj during Double Scan Directory.
2044  *
2045  * Remove the @obj's current (invalid) linkEA entries, and insert
2046  * it in the directory .lustre/lost+found/MDTxxxx/ with the name:
2047  * ${FID}-${PFID}-D-${conflict_version}
2048  *
2049  * The caller should take the ldlm lock before the calling.
2050  *
2051  * \param[in] env       pointer to the thread context
2052  * \param[in] com       pointer to the lfsck component
2053  * \param[in] obj       pointer to the orphan object to be handled
2054  * \param[in] pfid      the new fid for the object's ".." name entry
2055  * \param[in,out] lh    ldlm lock handler for the given @obj
2056  * \param[out] type     to tell the caller what the inconsistency is
2057  *
2058  * \retval              positive number for repaired cases
2059  * \retval              0 if nothing to be repaired
2060  * \retval              negative error number on failure
2061  */
2062 static int
2063 lfsck_namespace_dsd_orphan(const struct lu_env *env,
2064                            struct lfsck_component *com,
2065                            struct dt_object *obj,
2066                            const struct lu_fid *pfid,
2067                            struct lustre_handle *lh,
2068                            enum lfsck_namespace_inconsistency_type *type)
2069 {
2070         struct lfsck_thread_info *info = lfsck_env_info(env);
2071         int                       rc;
2072         ENTRY;
2073
2074         /* Remove the unrecognized linkEA. */
2075         rc = lfsck_namespace_links_remove(env, com, obj);
2076         lfsck_ibits_unlock(lh, LCK_EX);
2077         if (rc < 0 && rc != -ENODATA)
2078                 RETURN(rc);
2079
2080         *type = LNIT_MUL_REF;
2081         /* The unique linkEA is invalid, even if the ".." name entry may be
2082          * valid, we still cannot know via which name entry this directory
2083          * will be referenced. Then handle it as pure orphan. */
2084         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
2085                  "-"DFID, PFID(pfid));
2086         rc = lfsck_namespace_insert_orphan(env, com, obj,
2087                                            info->lti_tmpbuf, "D", NULL);
2088
2089         RETURN(rc);
2090 }
2091
2092 /**
2093  * Double Scan Directory object for single linkEA entry case.
2094  *
2095  * The given @child has unique linkEA entry. If the linkEA entry is valid,
2096  * then check whether the name is in the namespace or not, if not, add the
2097  * missing name entry back to namespace. If the linkEA entry is invalid,
2098  * then remove it and insert the @child in the .lustre/lost+found/MDTxxxx/
2099  * as an orphan.
2100  *
2101  * \param[in] env       pointer to the thread context
2102  * \param[in] com       pointer to the lfsck component
2103  * \param[in] child     pointer to the directory to be double scanned
2104  * \param[in] pfid      the FID corresponding to the ".." entry
2105  * \param[in] ldata     pointer to the linkEA data for the given @child
2106  * \param[in,out] lh    ldlm lock handler for the given @child
2107  * \param[out] type     to tell the caller what the inconsistency is
2108  * \param[in] retry     if found inconsistency, but the caller does not hold
2109  *                      ldlm lock on the @child, then set @retry as true
2110  *
2111  * \retval              positive number for repaired cases
2112  * \retval              0 if nothing to be repaired
2113  * \retval              negative error number on failure
2114  */
2115 static int
2116 lfsck_namespace_dsd_single(const struct lu_env *env,
2117                            struct lfsck_component *com,
2118                            struct dt_object *child,
2119                            const struct lu_fid *pfid,
2120                            struct linkea_data *ldata,
2121                            struct lustre_handle *lh,
2122                            enum lfsck_namespace_inconsistency_type *type,
2123                            bool *retry)
2124 {
2125         struct lfsck_thread_info *info          = lfsck_env_info(env);
2126         struct lu_name           *cname         = &info->lti_name;
2127         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
2128         struct lu_fid            *tfid          = &info->lti_fid3;
2129         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2130         struct dt_object         *parent        = NULL;
2131         int                       rc            = 0;
2132         ENTRY;
2133
2134         lfsck_namespace_unpack_linkea_entry(ldata, cname, tfid, info->lti_key);
2135         /* The unique linkEA entry with bad parent will be handled as orphan. */
2136         if (!fid_is_sane(tfid)) {
2137                 if (!lustre_handle_is_used(lh) && retry != NULL)
2138                         *retry = true;
2139                 else
2140                         rc = lfsck_namespace_dsd_orphan(env, com, child,
2141                                                         pfid, lh, type);
2142
2143                 GOTO(out, rc);
2144         }
2145
2146         parent = lfsck_object_find_bottom(env, lfsck, tfid);
2147         if (IS_ERR(parent))
2148                 GOTO(out, rc = PTR_ERR(parent));
2149
2150         /* We trust the unique linkEA entry in spite of whether it matches the
2151          * ".." name entry or not. Because even if the linkEA entry is wrong
2152          * and the ".." name entry is right, we still cannot know via which
2153          * name entry the child will be referenced, since all known entries
2154          * have been verified during the first-stage scanning. */
2155         if (!dt_object_exists(parent)) {
2156                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2157                         *retry = true;
2158
2159                         GOTO(out, rc = 0);
2160                 }
2161
2162                 lfsck_ibits_unlock(lh, LCK_EX);
2163
2164 lost_parent:
2165                 /* Create the lost parent as an orphan. */
2166                 rc = lfsck_namespace_create_orphan(env, com, parent);
2167                 if (rc >= 0) {
2168                         /* Add the missing name entry to the parent. */
2169                         rc = lfsck_namespace_insert_normal(env, com, parent,
2170                                                         child, cname->ln_name);
2171                         if (unlikely(rc == -EEXIST)) {
2172                                 /* Unfortunately, someone reused the name
2173                                  * under the parent by race. So we have
2174                                  * to remove the linkEA entry from
2175                                  * current child object. It means that the
2176                                  * LFSCK cannot recover the system
2177                                  * totally back to its original status,
2178                                  * but it is necessary to make the
2179                                  * current system to be consistent. */
2180                                 rc = lfsck_namespace_shrink_linkea(env,
2181                                                 com, child, ldata,
2182                                                 cname, tfid, true);
2183                                 if (rc >= 0) {
2184                                         snprintf(info->lti_tmpbuf,
2185                                                  sizeof(info->lti_tmpbuf),
2186                                                  "-"DFID, PFID(pfid));
2187                                         rc = lfsck_namespace_insert_orphan(env,
2188                                                 com, child, info->lti_tmpbuf,
2189                                                 "D", NULL);
2190                                 }
2191                         }
2192                 }
2193
2194                 GOTO(out, rc);
2195         }
2196
2197         /* The unique linkEA entry with bad parent will be handled as orphan. */
2198         if (unlikely(!dt_try_as_dir(env, parent))) {
2199                 if (!lustre_handle_is_used(lh) && retry != NULL)
2200                         *retry = true;
2201                 else
2202                         rc = lfsck_namespace_dsd_orphan(env, com, child,
2203                                                         pfid, lh, type);
2204
2205                 GOTO(out, rc);
2206         }
2207
2208         rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
2209                        (const struct dt_key *)cname->ln_name, BYPASS_CAPA);
2210         if (rc == -ENOENT) {
2211                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2212                         *retry = true;
2213
2214                         GOTO(out, rc = 0);
2215                 }
2216
2217                 lfsck_ibits_unlock(lh, LCK_EX);
2218                 /* Add the missing name entry back to the namespace. */
2219                 rc = lfsck_namespace_insert_normal(env, com, parent, child,
2220                                                    cname->ln_name);
2221                 if (unlikely(rc == -ESTALE))
2222                         /* It may happen when the remote object has been
2223                          * removed, but the local MDT is not aware of that. */
2224                         goto lost_parent;
2225
2226                 if (unlikely(rc == -EEXIST)) {
2227                         /* Unfortunately, someone reused the name under the
2228                          * parent by race. So we have to remove the linkEA
2229                          * entry from current child object. It means that the
2230                          * LFSCK cannot recover the system totally back to
2231                          * its original status, but it is necessary to make
2232                          * the current system to be consistent.
2233                          *
2234                          * It also may be because of the LFSCK found some
2235                          * internal status of create operation. Under such
2236                          * case, nothing to be done. */
2237                         rc = lfsck_namespace_shrink_linkea_cond(env, com,
2238                                         parent, child, ldata, cname, tfid);
2239                         if (rc >= 0) {
2240                                 snprintf(info->lti_tmpbuf,
2241                                          sizeof(info->lti_tmpbuf),
2242                                          "-"DFID, PFID(pfid));
2243                                 rc = lfsck_namespace_insert_orphan(env, com,
2244                                         child, info->lti_tmpbuf, "D", NULL);
2245                         }
2246                 }
2247
2248                 GOTO(out, rc);
2249         }
2250
2251         if (rc != 0)
2252                 GOTO(out, rc);
2253
2254         if (!lu_fid_eq(tfid, cfid)) {
2255                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2256                         *retry = true;
2257
2258                         GOTO(out, rc = 0);
2259                 }
2260
2261                 lfsck_ibits_unlock(lh, LCK_EX);
2262                 /* The name entry references another MDT-object that
2263                  * may be created by the LFSCK for repairing dangling
2264                  * name entry. Try to replace it. */
2265                 rc = lfsck_namespace_replace_cond(env, com, parent, child,
2266                                                   tfid, cname);
2267                 if (rc == 0)
2268                         rc = lfsck_namespace_dsd_orphan(env, com, child,
2269                                                         pfid, lh, type);
2270
2271                 GOTO(out, rc);
2272         }
2273
2274         /* The ".." name entry is wrong, update it. */
2275         if (!lu_fid_eq(pfid, lfsck_dto2fid(parent))) {
2276                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2277                         *retry = true;
2278
2279                         GOTO(out, rc = 0);
2280                 }
2281
2282                 *type = LNIT_UNMATCHED_PAIRS;
2283                 rc = lfsck_namespace_repair_unmatched_pairs(env, com, child,
2284                                                 lfsck_dto2fid(parent), cname);
2285         }
2286
2287         GOTO(out, rc);
2288
2289 out:
2290         if (parent != NULL && !IS_ERR(parent))
2291                 lfsck_object_put(env, parent);
2292
2293         return rc;
2294 }
2295
2296 /**
2297  * Double Scan Directory object for multiple linkEA entries case.
2298  *
2299  * The given @child has multiple linkEA entries. There is at most one linkEA
2300  * entry will be valid, all the others will be removed. Firstly, the function
2301  * will try to find out the linkEA entry for which the name entry exists under
2302  * the given parent (@pfid). If there is no linkEA entry that matches the given
2303  * ".." name entry, then tries to find out the first linkEA entry that both the
2304  * parent and the name entry exist to rebuild a new ".." name entry.
2305  *
2306  * \param[in] env       pointer to the thread context
2307  * \param[in] com       pointer to the lfsck component
2308  * \param[in] child     pointer to the directory to be double scanned
2309  * \param[in] pfid      the FID corresponding to the ".." entry
2310  * \param[in] ldata     pointer to the linkEA data for the given @child
2311  * \param[in,out] lh    ldlm lock handler for the given @child
2312  * \param[out] type     to tell the caller what the inconsistency is
2313  * \param[in] lpf       true if the ".." entry is under lost+found/MDTxxxx/
2314  *
2315  * \retval              positive number for repaired cases
2316  * \retval              0 if nothing to be repaired
2317  * \retval              negative error number on failure
2318  */
2319 static int
2320 lfsck_namespace_dsd_multiple(const struct lu_env *env,
2321                              struct lfsck_component *com,
2322                              struct dt_object *child,
2323                              const struct lu_fid *pfid,
2324                              struct linkea_data *ldata,
2325                              struct lustre_handle *lh,
2326                              enum lfsck_namespace_inconsistency_type *type,
2327                              bool lpf)
2328 {
2329         struct lfsck_thread_info *info          = lfsck_env_info(env);
2330         struct lu_name           *cname         = &info->lti_name;
2331         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
2332         struct lu_fid            *tfid          = &info->lti_fid3;
2333         struct lu_fid            *pfid2         = &info->lti_fid4;
2334         struct lfsck_namespace   *ns            = com->lc_file_ram;
2335         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2336         struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
2337         struct dt_object         *parent        = NULL;
2338         struct linkea_data        ldata_new     = { 0 };
2339         int                       count         = 0;
2340         int                       rc            = 0;
2341         bool                      once          = true;
2342         ENTRY;
2343
2344 again:
2345         while (ldata->ld_lee != NULL) {
2346                 lfsck_namespace_unpack_linkea_entry(ldata, cname, tfid,
2347                                                     info->lti_key);
2348                 /* Drop repeated linkEA entries. */
2349                 lfsck_namespace_filter_linkea_entry(ldata, cname, tfid, true);
2350                 /* Drop invalid linkEA entry. */
2351                 if (!fid_is_sane(tfid)) {
2352                         linkea_del_buf(ldata, cname);
2353                         continue;
2354                 }
2355
2356                 /* If current dotdot is the .lustre/lost+found/MDTxxxx/,
2357                  * then it is possible that: the directry object has ever
2358                  * been lost, but its name entry was there. In the former
2359                  * LFSCK run, during the first-stage scanning, the LFSCK
2360                  * found the dangling name entry, but it did not recreate
2361                  * the lost object, and when moved to the second-stage
2362                  * scanning, some children objects of the lost directory
2363                  * object were found, then the LFSCK recreated such lost
2364                  * directory object as an orphan.
2365                  *
2366                  * When the LFSCK runs again, if the dangling name is still
2367                  * there, the LFSCK should move the orphan directory object
2368                  * back to the normal namespace. */
2369                 if (!lpf && !lu_fid_eq(pfid, tfid) && once) {
2370                         linkea_next_entry(ldata);
2371                         continue;
2372                 }
2373
2374                 parent = lfsck_object_find_bottom(env, lfsck, tfid);
2375                 if (IS_ERR(parent))
2376                         RETURN(PTR_ERR(parent));
2377
2378                 if (!dt_object_exists(parent)) {
2379                         lfsck_object_put(env, parent);
2380                         if (ldata->ld_leh->leh_reccount > 1) {
2381                                 /* If it is NOT the last linkEA entry, then
2382                                  * there is still other chance to make the
2383                                  * child to be visible via other parent, then
2384                                  * remove this linkEA entry. */
2385                                 linkea_del_buf(ldata, cname);
2386                                 continue;
2387                         }
2388
2389                         break;
2390                 }
2391
2392                 /* The linkEA entry with bad parent will be removed. */
2393                 if (unlikely(!dt_try_as_dir(env, parent))) {
2394                         lfsck_object_put(env, parent);
2395                         linkea_del_buf(ldata, cname);
2396                         continue;
2397                 }
2398
2399                 rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
2400                                (const struct dt_key *)cname->ln_name,
2401                                BYPASS_CAPA);
2402                 *pfid2 = *lfsck_dto2fid(parent);
2403                 if (rc == -ENOENT) {
2404                         lfsck_object_put(env, parent);
2405                         linkea_next_entry(ldata);
2406                         continue;
2407                 }
2408
2409                 if (rc != 0) {
2410                         lfsck_object_put(env, parent);
2411
2412                         RETURN(rc);
2413                 }
2414
2415                 if (lu_fid_eq(tfid, cfid)) {
2416                         lfsck_object_put(env, parent);
2417                         if (!lu_fid_eq(pfid, pfid2)) {
2418                                 *type = LNIT_UNMATCHED_PAIRS;
2419                                 rc = lfsck_namespace_repair_unmatched_pairs(env,
2420                                                 com, child, pfid2, cname);
2421
2422                                 RETURN(rc);
2423                         }
2424
2425 rebuild:
2426                         /* It is the most common case that we find the
2427                          * name entry corresponding to the linkEA entry
2428                          * that matches the ".." name entry. */
2429                         rc = linkea_data_new(&ldata_new, &info->lti_big_buf);
2430                         if (rc != 0)
2431                                 RETURN(rc);
2432
2433                         rc = linkea_add_buf(&ldata_new, cname, pfid2);
2434                         if (rc != 0)
2435                                 RETURN(rc);
2436
2437                         rc = lfsck_namespace_rebuild_linkea(env, com, child,
2438                                                             &ldata_new);
2439                         if (rc < 0)
2440                                 RETURN(rc);
2441
2442                         linkea_del_buf(ldata, cname);
2443                         linkea_first_entry(ldata);
2444                         /* There may be some invalid dangling name entries under
2445                          * other parent directories, remove all of them. */
2446                         while (ldata->ld_lee != NULL) {
2447                                 lfsck_namespace_unpack_linkea_entry(ldata,
2448                                                 cname, tfid, info->lti_key);
2449                                 if (!fid_is_sane(tfid))
2450                                         goto next;
2451
2452                                 parent = lfsck_object_find_bottom(env, lfsck,
2453                                                                   tfid);
2454                                 if (IS_ERR(parent)) {
2455                                         rc = PTR_ERR(parent);
2456                                         if (rc != -ENOENT &&
2457                                             bk->lb_param & LPF_FAILOUT)
2458                                                 RETURN(rc);
2459
2460                                         goto next;
2461                                 }
2462
2463                                 if (!dt_object_exists(parent)) {
2464                                         lfsck_object_put(env, parent);
2465                                         goto next;
2466                                 }
2467
2468                                 rc = lfsck_namespace_repair_dirent(env, com,
2469                                         parent, child, cname->ln_name,
2470                                         cname->ln_name, S_IFDIR, false, true);
2471                                 lfsck_object_put(env, parent);
2472                                 if (rc < 0) {
2473                                         if (bk->lb_param & LPF_FAILOUT)
2474                                                 RETURN(rc);
2475
2476                                         goto next;
2477                                 }
2478
2479                                 count += rc;
2480
2481 next:
2482                                 linkea_del_buf(ldata, cname);
2483                         }
2484
2485                         ns->ln_dirent_repaired += count;
2486
2487                         RETURN(rc);
2488                 }
2489
2490                 lfsck_ibits_unlock(lh, LCK_EX);
2491                 /* The name entry references another MDT-object that may be
2492                  * created by the LFSCK for repairing dangling name entry.
2493                  * Try to replace it. */
2494                 rc = lfsck_namespace_replace_cond(env, com, parent, child,
2495                                                   tfid, cname);
2496                 lfsck_object_put(env, parent);
2497                 if (rc < 0)
2498                         RETURN(rc);
2499
2500                 if (rc > 0)
2501                         goto rebuild;
2502
2503                 linkea_del_buf(ldata, cname);
2504         }
2505
2506         if (ldata->ld_leh->leh_reccount == 1) {
2507                 rc = lfsck_namespace_dsd_single(env, com, child, pfid, ldata,
2508                                                 lh, type, NULL);
2509
2510                 RETURN(rc);
2511         }
2512
2513         /* All linkEA entries are invalid and removed, then handle the @child
2514          * as an orphan.*/
2515         if (ldata->ld_leh->leh_reccount == 0) {
2516                 rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, lh,
2517                                                 type);
2518
2519                 RETURN(rc);
2520         }
2521
2522         linkea_first_entry(ldata);
2523         /* If the dangling name entry for the orphan directory object has
2524          * been remvoed, then just check whether the directory object is
2525          * still under the .lustre/lost+found/MDTxxxx/ or not. */
2526         if (lpf) {
2527                 lpf = false;
2528                 goto again;
2529         }
2530
2531         /* There is no linkEA entry that matches the ".." name entry. Find
2532          * the first linkEA entry that both parent and name entry exist to
2533          * rebuild a new ".." name entry. */
2534         if (once) {
2535                 once = false;
2536                 goto again;
2537         }
2538
2539         RETURN(rc);
2540 }
2541
2542 /**
2543  * Double scan the directory object for namespace LFSCK.
2544  *
2545  * This function will verify the <parent, child> pairs in the namespace tree:
2546  * the parent references the child via some name entry that should be in the
2547  * child's linkEA entry, the child should back references the parent via its
2548  * ".." name entry.
2549  *
2550  * The LFSCK will scan every linkEA entry in turn until find out the first
2551  * matched pairs. If found, then all other linkEA entries will be dropped.
2552  * If all the linkEA entries cannot match the ".." name entry, then there
2553  * are serveral possible cases:
2554  *
2555  * 1) If there is only one linkEA entry, then trust it as long as the PFID
2556  *    in the linkEA entry is valid.
2557  *
2558  * 2) If there are multiple linkEA entries, then try to find the linkEA
2559  *    that matches the ".." name entry. If found, then all other entries
2560  *    are invalid; otherwise, it is quite possible that the ".." name entry
2561  *    is corrupted. Under such case, the LFSCK will rebuild the ".." name
2562  *    entry according to the first valid linkEA entry (both the parent and
2563  *    the name entry should exist).
2564  *
2565  * 3) If the directory object has no (valid) linkEA entry, then the
2566  *    directory object will be handled as pure orphan and inserted
2567  *    in the .lustre/lost+found/MDTxxxx/ with the name:
2568  *    ${self_FID}-${PFID}-D-${conflict_version}
2569  *
2570  * \param[in] env       pointer to the thread context
2571  * \param[in] com       pointer to the lfsck component
2572  * \param[in] child     pointer to the directory object to be handled
2573  * \param[in] flags     to indicate the specical checking on the @child
2574  *
2575  * \retval              positive number for repaired cases
2576  * \retval              0 if nothing to be repaired
2577  * \retval              negative error number on failure
2578  */
2579 static int lfsck_namespace_double_scan_dir(const struct lu_env *env,
2580                                            struct lfsck_component *com,
2581                                            struct dt_object *child, __u8 flags)
2582 {
2583         struct lfsck_thread_info *info          = lfsck_env_info(env);
2584         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
2585         struct lu_fid            *pfid          = &info->lti_fid2;
2586         struct lfsck_namespace   *ns            = com->lc_file_ram;
2587         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2588         struct lustre_handle      lh            = { 0 };
2589         struct linkea_data        ldata         = { 0 };
2590         bool                      unknown       = false;
2591         bool                      lpf           = false;
2592         bool                      retry         = false;
2593         enum lfsck_namespace_inconsistency_type type = LNIT_BAD_LINKEA;
2594         int                       rc            = 0;
2595         ENTRY;
2596
2597         LASSERT(!dt_object_remote(child));
2598
2599         if (!(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) {
2600                 CDEBUG(D_LFSCK, "%s: some MDT(s) maybe NOT take part in the"
2601                        "the namespace LFSCK, then the LFSCK cannot guarantee"
2602                        "all the name entries have been verified in first-stage"
2603                        "scanning. So have to skip orphan related handling for"
2604                        "the directory object "DFID" with remote name entry\n",
2605                        lfsck_lfsck2name(lfsck), PFID(cfid));
2606
2607                 RETURN(0);
2608         }
2609
2610         if (unlikely(!dt_try_as_dir(env, child)))
2611                 GOTO(out, rc = -ENOTDIR);
2612
2613         /* We only take ldlm lock on the @child when required. When the
2614          * logic comes here for the first time, it is always false. */
2615         if (0) {
2616
2617 lock:
2618                 rc = lfsck_ibits_lock(env, lfsck, child, &lh,
2619                                       MDS_INODELOCK_UPDATE |
2620                                       MDS_INODELOCK_XATTR, LCK_EX);
2621                 if (rc != 0)
2622                         GOTO(out, rc);
2623         }
2624
2625         dt_read_lock(env, child, 0);
2626         if (unlikely(lfsck_is_dead_obj(child))) {
2627                 dt_read_unlock(env, child);
2628
2629                 GOTO(out, rc = 0);
2630         }
2631
2632         rc = dt_lookup(env, child, (struct dt_rec *)pfid,
2633                        (const struct dt_key *)dotdot, BYPASS_CAPA);
2634         if (rc != 0) {
2635                 if (rc != -ENOENT && rc != -ENODATA && rc != -EINVAL) {
2636                         dt_read_unlock(env, child);
2637
2638                         GOTO(out, rc);
2639                 }
2640
2641                 if (!lustre_handle_is_used(&lh)) {
2642                         dt_read_unlock(env, child);
2643                         goto lock;
2644                 }
2645
2646                 fid_zero(pfid);
2647         } else if (lfsck->li_lpf_obj != NULL &&
2648                    lu_fid_eq(pfid, lfsck_dto2fid(lfsck->li_lpf_obj))) {
2649                 lpf = true;
2650         }
2651
2652         rc = lfsck_links_read(env, child, &ldata);
2653         dt_read_unlock(env, child);
2654         if (rc != 0) {
2655                 if (rc != -ENODATA && rc != -EINVAL)
2656                         GOTO(out, rc);
2657
2658                 if (!lustre_handle_is_used(&lh))
2659                         goto lock;
2660
2661                 if (rc == -EINVAL && !fid_is_zero(pfid)) {
2662                         /* Remove the corrupted linkEA. */
2663                         rc = lfsck_namespace_links_remove(env, com, child);
2664                         if (rc == 0)
2665                                 /* Here, because of the crashed linkEA, we
2666                                  * cannot know whether there is some parent
2667                                  * that references the child directory via
2668                                  * some name entry or not. So keep it there,
2669                                  * when the LFSCK run next time, if there is
2670                                  * some parent that references this object,
2671                                  * then the LFSCK can rebuild the linkEA;
2672                                  * otherwise, this object will be handled
2673                                  * as orphan as above. */
2674                                 unknown = true;
2675                 } else {
2676                         /* 1. If we have neither ".." nor linkEA,
2677                          *    then it is an orphan.
2678                          *
2679                          * 2. If we only have the ".." name entry,
2680                          *    but no parent references this child
2681                          *    directory, then handle it as orphan. */
2682                         lfsck_ibits_unlock(&lh, LCK_EX);
2683                         type = LNIT_MUL_REF;
2684                         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
2685                                  "-"DFID, PFID(pfid));
2686                         rc = lfsck_namespace_insert_orphan(env, com, child,
2687                                                 info->lti_tmpbuf, "D", NULL);
2688                 }
2689
2690                 GOTO(out, rc);
2691         }
2692
2693         linkea_first_entry(&ldata);
2694         /* This is the most common case: the object has unique linkEA entry. */
2695         if (ldata.ld_leh->leh_reccount == 1) {
2696                 rc = lfsck_namespace_dsd_single(env, com, child, pfid, &ldata,
2697                                                 &lh, &type, &retry);
2698                 if (retry) {
2699                         LASSERT(!lustre_handle_is_used(&lh));
2700
2701                         retry = false;
2702                         goto lock;
2703                 }
2704
2705                 GOTO(out, rc);
2706         }
2707
2708         if (!lustre_handle_is_used(&lh))
2709                 goto lock;
2710
2711         if (unlikely(ldata.ld_leh->leh_reccount == 0)) {
2712                 rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, &lh,
2713                                                 &type);
2714
2715                 GOTO(out, rc);
2716         }
2717
2718         /* When we come here, the cases usually like that:
2719          * 1) The directory object has a corrupted linkEA entry. During the
2720          *    first-stage scanning, the LFSCK cannot know such corruption,
2721          *    then it appends the right linkEA entry according to the found
2722          *    name entry after the bad one.
2723          *
2724          * 2) The directory object has a right linkEA entry. During the
2725          *    first-stage scanning, the LFSCK finds some bad name entry,
2726          *    but the LFSCK cannot aware that at that time, then it adds
2727          *    the bad linkEA entry for further processing. */
2728         rc = lfsck_namespace_dsd_multiple(env, com, child, pfid, &ldata,
2729                                           &lh, &type, lpf);
2730
2731         GOTO(out, rc);
2732
2733 out:
2734         lfsck_ibits_unlock(&lh, LCK_EX);
2735         if (rc > 0) {
2736                 switch (type) {
2737                 case LNIT_BAD_LINKEA:
2738                         ns->ln_linkea_repaired++;
2739                         break;
2740                 case LNIT_UNMATCHED_PAIRS:
2741                         ns->ln_unmatched_pairs_repaired++;
2742                         break;
2743                 case LNIT_MUL_REF:
2744                         ns->ln_mul_ref_repaired++;
2745                         break;
2746                 default:
2747                         break;
2748                 }
2749         }
2750
2751         if (unknown)
2752                 ns->ln_unknown_inconsistency++;
2753
2754         return rc;
2755 }
2756
2757 /**
2758  * Double scan the MDT-object for namespace LFSCK.
2759  *
2760  * If the MDT-object contains invalid or repeated linkEA entries, then drop
2761  * those entries from the linkEA; if the linkEA becomes empty or the object
2762  * has no linkEA, then it is an orphan and will be added into the directory
2763  * .lustre/lost+found/MDTxxxx/; if the remote parent is lost, then recreate
2764  * the remote parent; if the name entry corresponding to some linkEA entry
2765  * is lost, then add the name entry back to the namespace.
2766  *
2767  * \param[in] env       pointer to the thread context
2768  * \param[in] com       pointer to the lfsck component
2769  * \param[in] child     pointer to the dt_object to be handled
2770  * \param[in] flags     some hints to indicate how the @child should be handled
2771  *
2772  * \retval              positive number for repaired cases
2773  * \retval              0 if nothing to be repaired
2774  * \retval              negative error number on failure
2775  */
2776 static int lfsck_namespace_double_scan_one(const struct lu_env *env,
2777                                            struct lfsck_component *com,
2778                                            struct dt_object *child, __u8 flags)
2779 {
2780         struct lfsck_thread_info *info     = lfsck_env_info(env);
2781         struct lu_attr           *la       = &info->lti_la;
2782         struct lu_name           *cname    = &info->lti_name;
2783         struct lu_fid            *pfid     = &info->lti_fid;
2784         struct lu_fid            *cfid     = &info->lti_fid2;
2785         struct lfsck_instance    *lfsck    = com->lc_lfsck;
2786         struct lfsck_namespace   *ns       = com->lc_file_ram;
2787         struct dt_object         *parent   = NULL;
2788         struct linkea_data        ldata    = { 0 };
2789         bool                      repaired = false;
2790         int                       count    = 0;
2791         int                       rc;
2792         ENTRY;
2793
2794         dt_read_lock(env, child, 0);
2795         if (unlikely(lfsck_is_dead_obj(child))) {
2796                 dt_read_unlock(env, child);
2797
2798                 RETURN(0);
2799         }
2800
2801         if (S_ISDIR(lfsck_object_type(child))) {
2802                 dt_read_unlock(env, child);
2803                 rc = lfsck_namespace_double_scan_dir(env, com, child, flags);
2804
2805                 RETURN(rc);
2806         }
2807
2808         rc = lfsck_links_read(env, child, &ldata);
2809         dt_read_unlock(env, child);
2810         if (rc != 0)
2811                 GOTO(out, rc);
2812
2813         linkea_first_entry(&ldata);
2814         while (ldata.ld_lee != NULL) {
2815                 lfsck_namespace_unpack_linkea_entry(&ldata, cname, pfid,
2816                                                     info->lti_key);
2817                 rc = lfsck_namespace_filter_linkea_entry(&ldata, cname, pfid,
2818                                                          false);
2819                 /* Found repeated linkEA entries */
2820                 if (rc > 0) {
2821                         rc = lfsck_namespace_shrink_linkea(env, com, child,
2822                                                 &ldata, cname, pfid, false);
2823                         if (rc < 0)
2824                                 GOTO(out, rc);
2825
2826                         if (rc == 0)
2827                                 continue;
2828
2829                         repaired = true;
2830
2831                         /* fall through */
2832                 }
2833
2834                 /* Invalid PFID in the linkEA entry. */
2835                 if (!fid_is_sane(pfid)) {
2836                         rc = lfsck_namespace_shrink_linkea(env, com, child,
2837                                                 &ldata, cname, pfid, true);
2838                         if (rc < 0)
2839                                 GOTO(out, rc);
2840
2841                         if (rc > 0)
2842                                 repaired = true;
2843
2844                         continue;
2845                 }
2846
2847                 parent = lfsck_object_find_bottom(env, lfsck, pfid);
2848                 if (IS_ERR(parent))
2849                         GOTO(out, rc = PTR_ERR(parent));
2850
2851                 if (!dt_object_exists(parent)) {
2852
2853 lost_parent:
2854                         if (ldata.ld_leh->leh_reccount > 1) {
2855                                 /* If it is NOT the last linkEA entry, then
2856                                  * there is still other chance to make the
2857                                  * child to be visible via other parent, then
2858                                  * remove this linkEA entry. */
2859                                 rc = lfsck_namespace_shrink_linkea(env, com,
2860                                         child, &ldata, cname, pfid, true);
2861                         } else {
2862                                 /* Create the lost parent as an orphan. */
2863                                 rc = lfsck_namespace_create_orphan(env, com,
2864                                                                    parent);
2865                                 if (rc < 0) {
2866                                         lfsck_object_put(env, parent);
2867
2868                                         GOTO(out, rc);
2869                                 }
2870
2871                                 if (rc > 0)
2872                                         repaired = true;
2873
2874                                 /* Add the missing name entry to the parent. */
2875                                 rc = lfsck_namespace_insert_normal(env, com,
2876                                                 parent, child, cname->ln_name);
2877                                 if (unlikely(rc == -EEXIST))
2878                                         /* Unfortunately, someone reused the
2879                                          * name under the parent by race. So we
2880                                          * have to remove the linkEA entry from
2881                                          * current child object. It means that
2882                                          * the LFSCK cannot recover the system
2883                                          * totally back to its original status,
2884                                          * but it is necessary to make the
2885                                          * current system to be consistent. */
2886                                         rc = lfsck_namespace_shrink_linkea(env,
2887                                                         com, child, &ldata,
2888                                                         cname, pfid, true);
2889                                 else
2890                                         linkea_next_entry(&ldata);
2891                         }
2892
2893                         lfsck_object_put(env, parent);
2894                         if (rc < 0)
2895                                 GOTO(out, rc);
2896
2897                         if (rc > 0)
2898                                 repaired = true;
2899
2900                         continue;
2901                 }
2902
2903                 /* The linkEA entry with bad parent will be removed. */
2904                 if (unlikely(!dt_try_as_dir(env, parent))) {
2905                         lfsck_object_put(env, parent);
2906                         rc = lfsck_namespace_shrink_linkea(env, com, child,
2907                                                 &ldata, cname, pfid, true);
2908                         if (rc < 0)
2909                                 GOTO(out, rc);
2910
2911                         if (rc > 0)
2912                                 repaired = true;
2913
2914                         continue;
2915                 }
2916
2917                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
2918                                (const struct dt_key *)cname->ln_name,
2919                                BYPASS_CAPA);
2920                 if (rc != 0 && rc != -ENOENT) {
2921                         lfsck_object_put(env, parent);
2922
2923                         GOTO(out, rc);
2924                 }
2925
2926                 if (rc == 0) {
2927                         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
2928                                 /* It is the most common case that we
2929                                  * find the name entry corresponding
2930                                  * to the linkEA entry. */
2931                                 lfsck_object_put(env, parent);
2932                                 linkea_next_entry(&ldata);
2933                         } else {
2934                                 /* The name entry references another
2935                                  * MDT-object that may be created by
2936                                  * the LFSCK for repairing dangling
2937                                  * name entry. Try to replace it. */
2938                                 rc = lfsck_namespace_replace_cond(env, com,
2939                                                 parent, child, cfid, cname);
2940                                 lfsck_object_put(env, parent);
2941                                 if (rc < 0)
2942                                         GOTO(out, rc);
2943
2944                                 if (rc > 0) {
2945                                         repaired = true;
2946                                         linkea_next_entry(&ldata);
2947                                 } else {
2948                                         rc = lfsck_namespace_shrink_linkea(env,
2949                                                         com, child, &ldata,
2950                                                         cname, pfid, true);
2951                                         if (rc < 0)
2952                                                 GOTO(out, rc);
2953
2954                                         if (rc > 0)
2955                                                 repaired = true;
2956                                 }
2957                         }
2958
2959                         continue;
2960                 }
2961
2962                 rc = dt_attr_get(env, child, la, BYPASS_CAPA);
2963                 if (rc != 0)
2964                         GOTO(out, rc);
2965
2966                 /* If there is no name entry in the parent dir and the object
2967                  * link count is less than the linkea entries count, then the
2968                  * linkea entry should be removed. */
2969                 if (ldata.ld_leh->leh_reccount > la->la_nlink) {
2970                         rc = lfsck_namespace_shrink_linkea_cond(env, com,
2971                                         parent, child, &ldata, cname, pfid);
2972                         lfsck_object_put(env, parent);
2973                         if (rc < 0)
2974                                 GOTO(out, rc);
2975
2976                         if (rc > 0)
2977                                 repaired = true;
2978
2979                         continue;
2980                 }
2981
2982                 /* Add the missing name entry back to the namespace. */
2983                 rc = lfsck_namespace_insert_normal(env, com, parent, child,
2984                                                    cname->ln_name);
2985                 if (unlikely(rc == -ESTALE))
2986                         /* It may happen when the remote object has been
2987                          * removed, but the local MDT is not aware of that. */
2988                         goto lost_parent;
2989
2990                 if (unlikely(rc == -EEXIST))
2991                         /* Unfortunately, someone reused the name under the
2992                          * parent by race. So we have to remove the linkEA
2993                          * entry from current child object. It means that the
2994                          * LFSCK cannot recover the system totally back to
2995                          * its original status, but it is necessary to make
2996                          * the current system to be consistent.
2997                          *
2998                          * It also may be because of the LFSCK found some
2999                          * internal status of create operation. Under such
3000                          * case, nothing to be done. */
3001                         rc = lfsck_namespace_shrink_linkea_cond(env, com,
3002                                         parent, child, &ldata, cname, pfid);
3003                 else
3004                         linkea_next_entry(&ldata);
3005
3006                 lfsck_object_put(env, parent);
3007                 if (rc < 0)
3008                         GOTO(out, rc);
3009
3010                 if (rc > 0)
3011                         repaired = true;
3012         }
3013
3014         GOTO(out, rc = 0);
3015
3016 out:
3017         if (rc < 0 && rc != -ENODATA)
3018                 return rc;
3019
3020         if (rc == 0) {
3021                 LASSERT(ldata.ld_leh != NULL);
3022
3023                 count = ldata.ld_leh->leh_reccount;
3024         }
3025
3026         if (count == 0) {
3027                 /* If the child becomes orphan, then insert it into
3028                  * the global .lustre/lost+found/MDTxxxx directory. */
3029                 rc = lfsck_namespace_insert_orphan(env, com, child, "", "O",
3030                                                    &count);
3031                 if (rc < 0)
3032                         return rc;
3033
3034                 if (rc > 0) {
3035                         ns->ln_mul_ref_repaired++;
3036                         repaired = true;
3037                 }
3038         }
3039
3040         rc = dt_attr_get(env, child, la, BYPASS_CAPA);
3041         if (rc != 0)
3042                 return rc;
3043
3044         if (la->la_nlink != count) {
3045                 /* XXX: there will be other patch(es) for MDT-object
3046                  *      hard links verification. */
3047         }
3048
3049         if (repaired) {
3050                 if (la->la_nlink > 1)
3051                         ns->ln_mul_linked_repaired++;
3052
3053                 if (rc == 0)
3054                         rc = 1;
3055         }
3056
3057         return rc;
3058 }
3059
3060 static void lfsck_namespace_dump_statistics(struct seq_file *m,
3061                                             struct lfsck_namespace *ns,
3062                                             __u64 checked_phase1,
3063                                             __u64 checked_phase2,
3064                                             __u32 time_phase1,
3065                                             __u32 time_phase2)
3066 {
3067         seq_printf(m, "checked_phase1: "LPU64"\n"
3068                       "checked_phase2: "LPU64"\n"
3069                       "updated_phase1: "LPU64"\n"
3070                       "updated_phase2: "LPU64"\n"
3071                       "failed_phase1: "LPU64"\n"
3072                       "failed_phase2: "LPU64"\n"
3073                       "directories: "LPU64"\n"
3074                       "dirent_repaired: "LPU64"\n"
3075                       "linkea_repaired: "LPU64"\n"
3076                       "nlinks_repaired: "LPU64"\n"
3077                       "multiple_linked_checked: "LPU64"\n"
3078                       "multiple_linked_repaired: "LPU64"\n"
3079                       "unknown_inconsistency: "LPU64"\n"
3080                       "unmatched_pairs_repaired: "LPU64"\n"
3081                       "dangling_repaired: "LPU64"\n"
3082                       "multiple_referenced_repaired: "LPU64"\n"
3083                       "bad_file_type_repaired: "LPU64"\n"
3084                       "lost_dirent_repaired: "LPU64"\n"
3085                       "success_count: %u\n"
3086                       "run_time_phase1: %u seconds\n"
3087                       "run_time_phase2: %u seconds\n",
3088                       checked_phase1,
3089                       checked_phase2,
3090                       ns->ln_items_repaired,
3091                       ns->ln_objs_repaired_phase2,
3092                       ns->ln_items_failed,
3093                       ns->ln_objs_failed_phase2,
3094                       ns->ln_dirs_checked,
3095                       ns->ln_dirent_repaired,
3096                       ns->ln_linkea_repaired,
3097                       ns->ln_objs_nlink_repaired,
3098                       ns->ln_mul_linked_checked,
3099                       ns->ln_mul_linked_repaired,
3100                       ns->ln_unknown_inconsistency,
3101                       ns->ln_unmatched_pairs_repaired,
3102                       ns->ln_dangling_repaired,
3103                       ns->ln_mul_ref_repaired,
3104                       ns->ln_bad_type_repaired,
3105                       ns->ln_lost_dirent_repaired,
3106                       ns->ln_success_count,
3107                       time_phase1,
3108                       time_phase2);
3109 }
3110
3111 /* namespace APIs */
3112
3113 static int lfsck_namespace_reset(const struct lu_env *env,
3114                                  struct lfsck_component *com, bool init)
3115 {
3116         struct lfsck_instance   *lfsck = com->lc_lfsck;
3117         struct lfsck_namespace  *ns    = com->lc_file_ram;
3118         struct dt_object        *root;
3119         struct dt_object        *dto;
3120         int                      rc;
3121         ENTRY;
3122
3123         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
3124         if (IS_ERR(root))
3125                 GOTO(log, rc = PTR_ERR(root));
3126
3127         if (unlikely(!dt_try_as_dir(env, root)))
3128                 GOTO(put, rc = -ENOTDIR);
3129
3130         down_write(&com->lc_sem);
3131         if (init) {
3132                 memset(ns, 0, sizeof(*ns));
3133         } else {
3134                 __u32 count = ns->ln_success_count;
3135                 __u64 last_time = ns->ln_time_last_complete;
3136
3137                 memset(ns, 0, sizeof(*ns));
3138                 ns->ln_success_count = count;
3139                 ns->ln_time_last_complete = last_time;
3140         }
3141         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
3142         ns->ln_status = LS_INIT;
3143
3144         rc = local_object_unlink(env, lfsck->li_bottom, root,
3145                                  lfsck_namespace_name);
3146         if (rc != 0)
3147                 GOTO(out, rc);
3148
3149         lfsck_object_put(env, com->lc_obj);
3150         com->lc_obj = NULL;
3151         dto = local_index_find_or_create(env, lfsck->li_los, root,
3152                                          lfsck_namespace_name,
3153                                          S_IFREG | S_IRUGO | S_IWUSR,
3154                                          &dt_lfsck_features);
3155         if (IS_ERR(dto))
3156                 GOTO(out, rc = PTR_ERR(dto));
3157
3158         com->lc_obj = dto;
3159         rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
3160         if (rc != 0)
3161                 GOTO(out, rc);
3162
3163         rc = lfsck_namespace_store(env, com, true);
3164
3165         GOTO(out, rc);
3166
3167 out:
3168         up_write(&com->lc_sem);
3169
3170 put:
3171         lu_object_put(env, &root->do_lu);
3172 log:
3173         CDEBUG(D_LFSCK, "%s: namespace LFSCK reset: rc = %d\n",
3174                lfsck_lfsck2name(lfsck), rc);
3175         return rc;
3176 }
3177
3178 static void
3179 lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
3180                      bool new_checked)
3181 {
3182         struct lfsck_namespace *ns = com->lc_file_ram;
3183
3184         down_write(&com->lc_sem);
3185         if (new_checked)
3186                 com->lc_new_checked++;
3187         lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
3188         up_write(&com->lc_sem);
3189 }
3190
3191 static int lfsck_namespace_checkpoint(const struct lu_env *env,
3192                                       struct lfsck_component *com, bool init)
3193 {
3194         struct lfsck_instance   *lfsck = com->lc_lfsck;
3195         struct lfsck_namespace  *ns    = com->lc_file_ram;
3196         int                      rc;
3197
3198         if (!init) {
3199                 rc = lfsck_checkpoint_generic(env, com);
3200                 if (rc != 0)
3201                         goto log;
3202         }
3203
3204         down_write(&com->lc_sem);
3205         if (init) {
3206                 ns->ln_pos_latest_start = lfsck->li_pos_checkpoint;
3207         } else {
3208                 ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
3209                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
3210                                 HALF_SEC - lfsck->li_time_last_checkpoint);
3211                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
3212                 ns->ln_items_checked += com->lc_new_checked;
3213                 com->lc_new_checked = 0;
3214         }
3215
3216         rc = lfsck_namespace_store(env, com, false);
3217         up_write(&com->lc_sem);
3218
3219 log:
3220         CDEBUG(D_LFSCK, "%s: namespace LFSCK checkpoint at the pos ["LPU64
3221                ", "DFID", "LPX64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
3222                lfsck->li_pos_current.lp_oit_cookie,
3223                PFID(&lfsck->li_pos_current.lp_dir_parent),
3224                lfsck->li_pos_current.lp_dir_cookie, rc);
3225
3226         return rc > 0 ? 0 : rc;
3227 }
3228
3229 static int lfsck_namespace_prep(const struct lu_env *env,
3230                                 struct lfsck_component *com,
3231                                 struct lfsck_start_param *lsp)
3232 {
3233         struct lfsck_instance   *lfsck  = com->lc_lfsck;
3234         struct lfsck_namespace  *ns     = com->lc_file_ram;
3235         struct lfsck_position   *pos    = &com->lc_pos_start;
3236         int                      rc;
3237
3238         if (ns->ln_status == LS_COMPLETED) {
3239                 rc = lfsck_namespace_reset(env, com, false);
3240                 if (rc == 0)
3241                         rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
3242
3243                 if (rc != 0) {
3244                         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: "
3245                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
3246
3247                         return rc;
3248                 }
3249         }
3250
3251         down_write(&com->lc_sem);
3252         ns->ln_time_latest_start = cfs_time_current_sec();
3253         spin_lock(&lfsck->li_lock);
3254
3255         if (ns->ln_flags & LF_SCANNED_ONCE) {
3256                 if (!lfsck->li_drop_dryrun ||
3257                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
3258                         ns->ln_status = LS_SCANNING_PHASE2;
3259                         list_move_tail(&com->lc_link,
3260                                        &lfsck->li_list_double_scan);
3261                         if (!list_empty(&com->lc_link_dir))
3262                                 list_del_init(&com->lc_link_dir);
3263                         lfsck_pos_set_zero(pos);
3264                 } else {
3265                         ns->ln_status = LS_SCANNING_PHASE1;
3266                         ns->ln_run_time_phase1 = 0;
3267                         ns->ln_run_time_phase2 = 0;
3268                         ns->ln_items_checked = 0;
3269                         ns->ln_items_repaired = 0;
3270                         ns->ln_items_failed = 0;
3271                         ns->ln_dirs_checked = 0;
3272                         ns->ln_objs_checked_phase2 = 0;
3273                         ns->ln_objs_repaired_phase2 = 0;
3274                         ns->ln_objs_failed_phase2 = 0;
3275                         ns->ln_objs_nlink_repaired = 0;
3276                         ns->ln_dirent_repaired = 0;
3277                         ns->ln_linkea_repaired = 0;
3278                         ns->ln_mul_linked_checked = 0;
3279                         ns->ln_mul_linked_repaired = 0;
3280                         ns->ln_unknown_inconsistency = 0;
3281                         ns->ln_unmatched_pairs_repaired = 0;
3282                         ns->ln_dangling_repaired = 0;
3283                         ns->ln_mul_ref_repaired = 0;
3284                         ns->ln_bad_type_repaired = 0;
3285                         ns->ln_lost_dirent_repaired = 0;
3286                         fid_zero(&ns->ln_fid_latest_scanned_phase2);
3287                         if (list_empty(&com->lc_link_dir))
3288                                 list_add_tail(&com->lc_link_dir,
3289                                               &lfsck->li_list_dir);
3290                         *pos = ns->ln_pos_first_inconsistent;
3291                 }
3292         } else {
3293                 ns->ln_status = LS_SCANNING_PHASE1;
3294                 if (list_empty(&com->lc_link_dir))
3295                         list_add_tail(&com->lc_link_dir,
3296                                       &lfsck->li_list_dir);
3297                 if (!lfsck->li_drop_dryrun ||
3298                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
3299                         *pos = ns->ln_pos_last_checkpoint;
3300                         pos->lp_oit_cookie++;
3301                 } else {
3302                         *pos = ns->ln_pos_first_inconsistent;
3303                 }
3304         }
3305
3306         spin_unlock(&lfsck->li_lock);
3307         up_write(&com->lc_sem);
3308
3309         rc = lfsck_start_assistant(env, com, lsp);
3310
3311         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep done, start pos ["LPU64", "
3312                DFID", "LPX64"]: rc = %d\n",
3313                lfsck_lfsck2name(lfsck), pos->lp_oit_cookie,
3314                PFID(&pos->lp_dir_parent), pos->lp_dir_cookie, rc);
3315
3316         return rc;
3317 }
3318
3319 static int lfsck_namespace_exec_oit(const struct lu_env *env,
3320                                     struct lfsck_component *com,
3321                                     struct dt_object *obj)
3322 {
3323         struct lfsck_thread_info *info  = lfsck_env_info(env);
3324         struct lfsck_namespace   *ns    = com->lc_file_ram;
3325         struct lfsck_instance    *lfsck = com->lc_lfsck;
3326         const struct lu_fid      *fid   = lfsck_dto2fid(obj);
3327         struct lu_attr           *la    = &info->lti_la;
3328         struct lu_fid            *pfid  = &info->lti_fid2;
3329         struct lu_name           *cname = &info->lti_name;
3330         struct lu_seq_range      *range = &info->lti_range;
3331         struct dt_device         *dev   = lfsck->li_bottom;
3332         struct seq_server_site   *ss    =
3333                                 lu_site2seq(dev->dd_lu_dev.ld_site);
3334         struct linkea_data        ldata = { 0 };
3335         __u32                     idx   = lfsck_dev_idx(dev);
3336         int                       rc;
3337         ENTRY;
3338
3339         rc = lfsck_links_read(env, obj, &ldata);
3340         if (rc == -ENOENT)
3341                 GOTO(out, rc = 0);
3342
3343         /* -EINVAL means crashed linkEA, should be verified. */
3344         if (rc == -EINVAL) {
3345                 rc = lfsck_namespace_trace_update(env, com, fid,
3346                                                   LNTF_CHECK_LINKEA, true);
3347                 if (rc == 0) {
3348                         struct lustre_handle lh = { 0 };
3349
3350                         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
3351                                               MDS_INODELOCK_UPDATE |
3352                                               MDS_INODELOCK_XATTR, LCK_EX);
3353                         if (rc == 0) {
3354                                 rc = lfsck_namespace_links_remove(env, com,
3355                                                                   obj);
3356                                 lfsck_ibits_unlock(&lh, LCK_EX);
3357                         }
3358                 }
3359
3360                 GOTO(out, rc = (rc == -ENOENT ? 0 : rc));
3361         }
3362
3363         /* zero-linkEA object may be orphan, but it also maybe because
3364          * of upgrading. Currently, we cannot record it for double scan.
3365          * Because it may cause the LFSCK tracing file to be too large. */
3366         if (rc == -ENODATA) {
3367                 if (S_ISDIR(lfsck_object_type(obj)))
3368                         GOTO(out, rc = 0);
3369
3370                 rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
3371                 if (rc != 0)
3372                         GOTO(out, rc);
3373
3374                 if (la->la_nlink > 1)
3375                         rc = lfsck_namespace_trace_update(env, com, fid,
3376                                                 LNTF_CHECK_LINKEA, true);
3377
3378                 GOTO(out, rc);
3379         }
3380
3381         if (rc != 0)
3382                 GOTO(out, rc);
3383
3384         /* Record multiple-linked object. */
3385         if (ldata.ld_leh->leh_reccount > 1) {
3386                 rc = lfsck_namespace_trace_update(env, com, fid,
3387                                                   LNTF_CHECK_LINKEA, true);
3388
3389                 GOTO(out, rc);
3390         }
3391
3392         linkea_first_entry(&ldata);
3393         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
3394         if (!fid_is_sane(pfid)) {
3395                 rc = lfsck_namespace_trace_update(env, com, fid,
3396                                                   LNTF_CHECK_PARENT, true);
3397         } else {
3398                 fld_range_set_mdt(range);
3399                 rc = fld_local_lookup(env, ss->ss_server_fld,
3400                                       fid_seq(pfid), range);
3401                 if ((rc == -ENOENT) ||
3402                     (rc == 0 && range->lsr_index != idx)) {
3403                         rc = lfsck_namespace_trace_update(env, com, fid,
3404                                                 LNTF_CHECK_LINKEA, true);
3405                 } else {
3406                         if (S_ISDIR(lfsck_object_type(obj)))
3407                                 GOTO(out, rc = 0);
3408
3409                         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
3410                         if (rc != 0)
3411                                 GOTO(out, rc);
3412
3413                         if (la->la_nlink > 1)
3414                                 rc = lfsck_namespace_trace_update(env, com,
3415                                                 fid, LNTF_CHECK_LINKEA, true);
3416                 }
3417         }
3418
3419         GOTO(out, rc);
3420
3421 out:
3422         down_write(&com->lc_sem);
3423         com->lc_new_checked++;
3424         if (S_ISDIR(lfsck_object_type(obj)))
3425                 ns->ln_dirs_checked++;
3426         if (rc != 0)
3427                 lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
3428         up_write(&com->lc_sem);
3429
3430         return rc;
3431 }
3432
3433 static int lfsck_namespace_exec_dir(const struct lu_env *env,
3434                                     struct lfsck_component *com,
3435                                     struct lu_dirent *ent, __u16 type)
3436 {
3437         struct lfsck_assistant_data     *lad    = com->lc_data;
3438         struct lfsck_namespace_req      *lnr;
3439         bool                             wakeup = false;
3440
3441         lnr = lfsck_namespace_assistant_req_init(com->lc_lfsck, ent, type);
3442         if (IS_ERR(lnr)) {
3443                 struct lfsck_namespace *ns = com->lc_file_ram;
3444
3445                 lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
3446                 return PTR_ERR(lnr);
3447         }
3448
3449         spin_lock(&lad->lad_lock);
3450         if (lad->lad_assistant_status < 0) {
3451                 spin_unlock(&lad->lad_lock);
3452                 lfsck_namespace_assistant_req_fini(env, &lnr->lnr_lar);
3453                 return lad->lad_assistant_status;
3454         }
3455
3456         list_add_tail(&lnr->lnr_lar.lar_list, &lad->lad_req_list);
3457         if (lad->lad_prefetched == 0)
3458                 wakeup = true;
3459
3460         lad->lad_prefetched++;
3461         spin_unlock(&lad->lad_lock);
3462         if (wakeup)
3463                 wake_up_all(&lad->lad_thread.t_ctl_waitq);
3464
3465         down_write(&com->lc_sem);
3466         com->lc_new_checked++;
3467         up_write(&com->lc_sem);
3468
3469         return 0;
3470 }
3471
3472 static int lfsck_namespace_post(const struct lu_env *env,
3473                                 struct lfsck_component *com,
3474                                 int result, bool init)
3475 {
3476         struct lfsck_instance   *lfsck = com->lc_lfsck;
3477         struct lfsck_namespace  *ns    = com->lc_file_ram;
3478         int                      rc;
3479         ENTRY;
3480
3481         lfsck_post_generic(env, com, &result);
3482
3483         down_write(&com->lc_sem);
3484         spin_lock(&lfsck->li_lock);
3485         if (!init)
3486                 ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
3487         if (result > 0) {
3488                 ns->ln_status = LS_SCANNING_PHASE2;
3489                 ns->ln_flags |= LF_SCANNED_ONCE;
3490                 ns->ln_flags &= ~LF_UPGRADE;
3491                 list_del_init(&com->lc_link_dir);
3492                 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
3493         } else if (result == 0) {
3494                 ns->ln_status = lfsck->li_status;
3495                 if (ns->ln_status == 0)
3496                         ns->ln_status = LS_STOPPED;
3497                 if (ns->ln_status != LS_PAUSED) {
3498                         list_del_init(&com->lc_link_dir);
3499                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
3500                 }
3501         } else {
3502                 ns->ln_status = LS_FAILED;
3503                 list_del_init(&com->lc_link_dir);
3504                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
3505         }
3506         spin_unlock(&lfsck->li_lock);
3507
3508         if (!init) {
3509                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
3510                                 HALF_SEC - lfsck->li_time_last_checkpoint);
3511                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
3512                 ns->ln_items_checked += com->lc_new_checked;
3513                 com->lc_new_checked = 0;
3514         }
3515
3516         rc = lfsck_namespace_store(env, com, false);
3517         up_write(&com->lc_sem);
3518
3519         CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n",
3520                lfsck_lfsck2name(lfsck), rc);
3521
3522         RETURN(rc);
3523 }
3524
3525 static int
3526 lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
3527                      struct seq_file *m)
3528 {
3529         struct lfsck_instance   *lfsck = com->lc_lfsck;
3530         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
3531         struct lfsck_namespace  *ns    = com->lc_file_ram;
3532         int                      rc;
3533
3534         down_read(&com->lc_sem);
3535         seq_printf(m, "name: lfsck_namespace\n"
3536                    "magic: %#x\n"
3537                    "version: %d\n"
3538                    "status: %s\n",
3539                    ns->ln_magic,
3540                    bk->lb_version,
3541                    lfsck_status2names(ns->ln_status));
3542
3543         rc = lfsck_bits_dump(m, ns->ln_flags, lfsck_flags_names, "flags");
3544         if (rc < 0)
3545                 goto out;
3546
3547         rc = lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param");
3548         if (rc < 0)
3549                 goto out;
3550
3551         rc = lfsck_time_dump(m, ns->ln_time_last_complete,
3552                              "time_since_last_completed");
3553         if (rc < 0)
3554                 goto out;
3555
3556         rc = lfsck_time_dump(m, ns->ln_time_latest_start,
3557                              "time_since_latest_start");
3558         if (rc < 0)
3559                 goto out;
3560
3561         rc = lfsck_time_dump(m, ns->ln_time_last_checkpoint,
3562                              "time_since_last_checkpoint");
3563         if (rc < 0)
3564                 goto out;
3565
3566         rc = lfsck_pos_dump(m, &ns->ln_pos_latest_start,
3567                             "latest_start_position");
3568         if (rc < 0)
3569                 goto out;
3570
3571         rc = lfsck_pos_dump(m, &ns->ln_pos_last_checkpoint,
3572                             "last_checkpoint_position");
3573         if (rc < 0)
3574                 goto out;
3575
3576         rc = lfsck_pos_dump(m, &ns->ln_pos_first_inconsistent,
3577                             "first_failure_position");
3578         if (rc < 0)
3579                 goto out;
3580
3581         if (ns->ln_status == LS_SCANNING_PHASE1) {
3582                 struct lfsck_position pos;
3583                 const struct dt_it_ops *iops;
3584                 cfs_duration_t duration = cfs_time_current() -
3585                                           lfsck->li_time_last_checkpoint;
3586                 __u64 checked = ns->ln_items_checked + com->lc_new_checked;
3587                 __u64 speed = checked;
3588                 __u64 new_checked = com->lc_new_checked * HZ;
3589                 __u32 rtime = ns->ln_run_time_phase1 +
3590                               cfs_duration_sec(duration + HALF_SEC);
3591
3592                 if (duration != 0)
3593                         do_div(new_checked, duration);
3594                 if (rtime != 0)
3595                         do_div(speed, rtime);
3596                 lfsck_namespace_dump_statistics(m, ns, checked,
3597                                                 ns->ln_objs_checked_phase2,
3598                                                 rtime, ns->ln_run_time_phase2);
3599
3600                 seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
3601                               "average_speed_phase2: N/A\n"
3602                               "real_time_speed_phase1: "LPU64" items/sec\n"
3603                               "real_time_speed_phase2: N/A\n",
3604                               speed,
3605                               new_checked);
3606
3607                 LASSERT(lfsck->li_di_oit != NULL);
3608
3609                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
3610
3611                 /* The low layer otable-based iteration position may NOT
3612                  * exactly match the namespace-based directory traversal
3613                  * cookie. Generally, it is not a serious issue. But the
3614                  * caller should NOT make assumption on that. */
3615                 pos.lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
3616                 if (!lfsck->li_current_oit_processed)
3617                         pos.lp_oit_cookie--;
3618
3619                 spin_lock(&lfsck->li_lock);
3620                 if (lfsck->li_di_dir != NULL) {
3621                         pos.lp_dir_cookie = lfsck->li_cookie_dir;
3622                         if (pos.lp_dir_cookie >= MDS_DIR_END_OFF) {
3623                                 fid_zero(&pos.lp_dir_parent);
3624                                 pos.lp_dir_cookie = 0;
3625                         } else {
3626                                 pos.lp_dir_parent =
3627                                         *lfsck_dto2fid(lfsck->li_obj_dir);
3628                         }
3629                 } else {
3630                         fid_zero(&pos.lp_dir_parent);
3631                         pos.lp_dir_cookie = 0;
3632                 }
3633                 spin_unlock(&lfsck->li_lock);
3634                 lfsck_pos_dump(m, &pos, "current_position");
3635         } else if (ns->ln_status == LS_SCANNING_PHASE2) {
3636                 cfs_duration_t duration = cfs_time_current() -
3637                                           lfsck->li_time_last_checkpoint;
3638                 __u64 checked = ns->ln_objs_checked_phase2 +
3639                                 com->lc_new_checked;
3640                 __u64 speed1 = ns->ln_items_checked;
3641                 __u64 speed2 = checked;
3642                 __u64 new_checked = com->lc_new_checked * HZ;
3643                 __u32 rtime = ns->ln_run_time_phase2 +
3644                               cfs_duration_sec(duration + HALF_SEC);
3645
3646                 if (duration != 0)
3647                         do_div(new_checked, duration);
3648                 if (ns->ln_run_time_phase1 != 0)
3649                         do_div(speed1, ns->ln_run_time_phase1);
3650                 if (rtime != 0)
3651                         do_div(speed2, rtime);
3652                 lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked,
3653                                                 checked,
3654                                                 ns->ln_run_time_phase1, rtime);
3655
3656                 seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
3657                               "average_speed_phase2: "LPU64" objs/sec\n"
3658                               "real_time_speed_phase1: N/A\n"
3659                               "real_time_speed_phase2: "LPU64" objs/sec\n"
3660                               "current_position: "DFID"\n",
3661                               speed1,
3662                               speed2,
3663                               new_checked,
3664                               PFID(&ns->ln_fid_latest_scanned_phase2));
3665         } else {
3666                 __u64 speed1 = ns->ln_items_checked;
3667                 __u64 speed2 = ns->ln_objs_checked_phase2;
3668
3669                 if (ns->ln_run_time_phase1 != 0)
3670                         do_div(speed1, ns->ln_run_time_phase1);
3671                 if (ns->ln_run_time_phase2 != 0)
3672                         do_div(speed2, ns->ln_run_time_phase2);
3673                 lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked,
3674                                                 ns->ln_objs_checked_phase2,
3675                                                 ns->ln_run_time_phase1,
3676                                                 ns->ln_run_time_phase2);
3677
3678                 seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
3679                               "average_speed_phase2: "LPU64" objs/sec\n"
3680                               "real_time_speed_phase1: N/A\n"
3681                               "real_time_speed_phase2: N/A\n"
3682                               "current_position: N/A\n",
3683                               speed1,
3684                               speed2);
3685         }
3686 out:
3687         up_read(&com->lc_sem);
3688         return 0;
3689 }
3690
3691 static int lfsck_namespace_double_scan(const struct lu_env *env,
3692                                        struct lfsck_component *com)
3693 {
3694         struct lfsck_namespace *ns = com->lc_file_ram;
3695
3696         return lfsck_double_scan_generic(env, com, ns->ln_status);
3697 }
3698
3699 static void lfsck_namespace_data_release(const struct lu_env *env,
3700                                          struct lfsck_component *com)
3701 {
3702         struct lfsck_assistant_data     *lad    = com->lc_data;
3703         struct lfsck_tgt_descs          *ltds   = &com->lc_lfsck->li_mdt_descs;
3704         struct lfsck_tgt_desc           *ltd;
3705         struct lfsck_tgt_desc           *next;
3706
3707         LASSERT(lad != NULL);
3708         LASSERT(thread_is_init(&lad->lad_thread) ||
3709                 thread_is_stopped(&lad->lad_thread));
3710         LASSERT(list_empty(&lad->lad_req_list));
3711
3712         com->lc_data = NULL;
3713
3714         spin_lock(&ltds->ltd_lock);
3715         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
3716                                  ltd_namespace_phase_list) {
3717                 list_del_init(&ltd->ltd_namespace_phase_list);
3718         }
3719         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
3720                                  ltd_namespace_phase_list) {
3721                 list_del_init(&ltd->ltd_namespace_phase_list);
3722         }
3723         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_list,
3724                                  ltd_namespace_list) {
3725                 list_del_init(&ltd->ltd_namespace_list);
3726         }
3727         spin_unlock(&ltds->ltd_lock);
3728
3729         CFS_FREE_BITMAP(lad->lad_bitmap);
3730
3731         OBD_FREE_PTR(lad);
3732 }
3733
3734 static int lfsck_namespace_in_notify(const struct lu_env *env,
3735                                      struct lfsck_component *com,
3736                                      struct lfsck_request *lr)
3737 {
3738         struct lfsck_instance           *lfsck = com->lc_lfsck;
3739         struct lfsck_namespace          *ns    = com->lc_file_ram;
3740         struct lfsck_assistant_data     *lad   = com->lc_data;
3741         struct lfsck_tgt_descs          *ltds  = &lfsck->li_mdt_descs;
3742         struct lfsck_tgt_desc           *ltd;
3743         int                              rc;
3744         bool                             fail  = false;
3745         ENTRY;
3746
3747         switch (lr->lr_event) {
3748         case LE_CREATE_ORPHAN: {
3749                 struct dt_object *orphan = NULL;
3750
3751                 CDEBUG(D_LFSCK, "%s: namespace LFSCK handling notify from "
3752                        "MDT %x to create orphan"DFID" with type %o\n",
3753                        lfsck_lfsck2name(lfsck), lr->lr_index,
3754                        PFID(&lr->lr_fid), lr->lr_type);
3755
3756                 orphan = lfsck_object_find(env, lfsck, &lr->lr_fid);
3757                 if (IS_ERR(orphan))
3758                         GOTO(out_create, rc = PTR_ERR(orphan));
3759
3760                 if (dt_object_exists(orphan))
3761                         GOTO(out_create, rc = -EEXIST);
3762
3763                 rc = lfsck_namespace_create_orphan_local(env, com, orphan,
3764                                                          lr->lr_type);
3765
3766                 GOTO(out_create, rc = (rc == 1) ? 0 : rc);
3767
3768 out_create:
3769                 CDEBUG(D_LFSCK, "%s: namespace LFSCK handled notify from "
3770                        "MDT %x to create orphan"DFID" with type %o: rc = %d\n",
3771                        lfsck_lfsck2name(lfsck), lr->lr_index,
3772                        PFID(&lr->lr_fid), lr->lr_type, rc);
3773
3774                 if (orphan != NULL && !IS_ERR(orphan))
3775                         lfsck_object_put(env, orphan);
3776
3777                 return rc;
3778         }
3779         case LE_PHASE1_DONE:
3780         case LE_PHASE2_DONE:
3781         case LE_PEER_EXIT:
3782                 break;
3783         default:
3784                 RETURN(-EINVAL);
3785         }
3786
3787         CDEBUG(D_LFSCK, "%s: namespace LFSCK handles notify %u from MDT %x, "
3788                "status %d\n", lfsck_lfsck2name(lfsck), lr->lr_event,
3789                lr->lr_index, lr->lr_status);
3790
3791         spin_lock(&ltds->ltd_lock);
3792         ltd = LTD_TGT(ltds, lr->lr_index);
3793         if (ltd == NULL) {
3794                 spin_unlock(&ltds->ltd_lock);
3795
3796                 RETURN(-ENXIO);
3797         }
3798
3799         list_del_init(&ltd->ltd_namespace_phase_list);
3800         switch (lr->lr_event) {
3801         case LE_PHASE1_DONE:
3802                 if (lr->lr_status <= 0) {
3803                         ltd->ltd_namespace_done = 1;
3804                         list_del_init(&ltd->ltd_namespace_list);
3805                         CDEBUG(D_LFSCK, "%s: MDT %x failed/stopped at "
3806                                "phase1 for namespace LFSCK: rc = %d.\n",
3807                                lfsck_lfsck2name(lfsck),
3808                                ltd->ltd_index, lr->lr_status);
3809                         ns->ln_flags |= LF_INCOMPLETE;
3810                         fail = true;
3811                         break;
3812                 }
3813
3814                 if (list_empty(&ltd->ltd_namespace_list))
3815                         list_add_tail(&ltd->ltd_namespace_list,
3816                                       &lad->lad_mdt_list);
3817                 list_add_tail(&ltd->ltd_namespace_phase_list,
3818                               &lad->lad_mdt_phase2_list);
3819                 break;
3820         case LE_PHASE2_DONE:
3821                 ltd->ltd_namespace_done = 1;
3822                 list_del_init(&ltd->ltd_namespace_list);
3823                 break;
3824         case LE_PEER_EXIT:
3825                 fail = true;
3826                 ltd->ltd_namespace_done = 1;
3827                 list_del_init(&ltd->ltd_namespace_list);
3828                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT)) {
3829                         CDEBUG(D_LFSCK,
3830                                "%s: the peer MDT %x exit namespace LFSCK\n",
3831                                lfsck_lfsck2name(lfsck), ltd->ltd_index);
3832                         ns->ln_flags |= LF_INCOMPLETE;
3833                 }
3834                 break;
3835         default:
3836                 break;
3837         }
3838         spin_unlock(&ltds->ltd_lock);
3839
3840         if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
3841                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
3842
3843                 memset(stop, 0, sizeof(*stop));
3844                 stop->ls_status = lr->lr_status;
3845                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
3846                 lfsck_stop(env, lfsck->li_bottom, stop);
3847         } else if (lfsck_phase2_next_ready(lad)) {
3848                 wake_up_all(&lad->lad_thread.t_ctl_waitq);
3849         }
3850
3851         RETURN(0);
3852 }
3853
3854 static int lfsck_namespace_query(const struct lu_env *env,
3855                                  struct lfsck_component *com)
3856 {
3857         struct lfsck_namespace *ns = com->lc_file_ram;
3858
3859         return ns->ln_status;
3860 }
3861
3862 static struct lfsck_operations lfsck_namespace_ops = {
3863         .lfsck_reset            = lfsck_namespace_reset,
3864         .lfsck_fail             = lfsck_namespace_fail,
3865         .lfsck_checkpoint       = lfsck_namespace_checkpoint,
3866         .lfsck_prep             = lfsck_namespace_prep,
3867         .lfsck_exec_oit         = lfsck_namespace_exec_oit,
3868         .lfsck_exec_dir         = lfsck_namespace_exec_dir,
3869         .lfsck_post             = lfsck_namespace_post,
3870         .lfsck_dump             = lfsck_namespace_dump,
3871         .lfsck_double_scan      = lfsck_namespace_double_scan,
3872         .lfsck_data_release     = lfsck_namespace_data_release,
3873         .lfsck_quit             = lfsck_quit_generic,
3874         .lfsck_in_notify        = lfsck_namespace_in_notify,
3875         .lfsck_query            = lfsck_namespace_query,
3876 };
3877
3878 /**
3879  * Repair dangling name entry.
3880  *
3881  * For the name entry with dangling reference, we need to repare the
3882  * inconsistency according to the LFSCK sponsor's requirement:
3883  *
3884  * 1) Keep the inconsistency there and report the inconsistency case,
3885  *    then give the chance to the application to find related issues,
3886  *    and the users can make the decision about how to handle it with
3887  *    more human knownledge. (by default)
3888  *
3889  * 2) Re-create the missing MDT-object with the FID information.
3890  *
3891  * \param[in] env       pointer to the thread context
3892  * \param[in] com       pointer to the lfsck component
3893  * \param[in] child     pointer to the object corresponding to the dangling
3894  *                      name entry
3895  * \param[in] lnr       pointer to the namespace request that contains the
3896  *                      name's name, parent object, parent's LMV, and ect.
3897  *
3898  * \retval              positive number if no need to repair
3899  * \retval              zero for repaired successfully
3900  * \retval              negative error number on failure
3901  */
3902 int lfsck_namespace_repair_dangling(const struct lu_env *env,
3903                                     struct lfsck_component *com,
3904                                     struct dt_object *child,
3905                                     struct lfsck_namespace_req *lnr)
3906 {
3907         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3908         struct lu_attr                  *la     = &info->lti_la;
3909         struct dt_allocation_hint       *hint   = &info->lti_hint;
3910         struct dt_object_format         *dof    = &info->lti_dof;
3911         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
3912         struct dt_object                *parent = lnr->lnr_obj;
3913         const struct lu_name            *cname;
3914         struct linkea_data               ldata  = { 0 };
3915         struct lustre_handle             lh     = { 0 };
3916         struct lu_buf                    linkea_buf;
3917         struct lfsck_instance           *lfsck  = com->lc_lfsck;
3918         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
3919         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
3920         struct thandle                  *th     = NULL;
3921         int                              rc     = 0;
3922         __u16                            type   = lnr->lnr_type;
3923         bool                             create;
3924         ENTRY;
3925
3926         cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen);
3927         if (bk->lb_param & LPF_CREATE_MDTOBJ)
3928                 create = true;
3929         else
3930                 create = false;
3931
3932         if (!create || bk->lb_param & LPF_DRYRUN)
3933                 GOTO(log, rc = 0);
3934
3935         rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
3936         if (rc != 0)
3937                 GOTO(log, rc);
3938
3939         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
3940         if (rc != 0)
3941                 GOTO(log, rc);
3942
3943         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
3944                               MDS_INODELOCK_UPDATE, LCK_EX);
3945         if (rc != 0)
3946                 GOTO(log, rc);
3947
3948         rc = lfsck_namespace_check_exist(env, parent, child, lnr->lnr_name);
3949         if (rc != 0)
3950                 GOTO(log, rc);
3951
3952         th = dt_trans_create(env, dev);
3953         if (IS_ERR(th))
3954                 GOTO(log, rc = PTR_ERR(th));
3955
3956         /* Set the ctime as zero, then others can know it is created for
3957          * repairing dangling name entry by LFSCK. And if the LFSCK made
3958          * wrong decision and the real MDT-object has been found later,
3959          * then the LFSCK has chance to fix the incosistency properly. */
3960         memset(la, 0, sizeof(*la));
3961         la->la_mode = (type & S_IFMT) | 0600;
3962         la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
3963                         LA_ATIME | LA_MTIME | LA_CTIME;
3964
3965         child->do_ops->do_ah_init(env, hint, parent, child,
3966                                   la->la_mode & S_IFMT);
3967
3968         memset(dof, 0, sizeof(*dof));
3969         dof->dof_type = dt_mode_to_dft(type);
3970         /* If the target is a regular file, then the LFSCK will only create
3971          * the MDT-object without stripes (dof->dof_reg.striped = 0). related
3972          * OST-objects will be created when write open. */
3973
3974         /* 1a. create child. */
3975         rc = dt_declare_create(env, child, la, hint, dof, th);
3976         if (rc != 0)
3977                 GOTO(stop, rc);
3978
3979         if (S_ISDIR(type)) {
3980                 if (unlikely(!dt_try_as_dir(env, child)))
3981                         GOTO(stop, rc = -ENOTDIR);
3982
3983                 /* 2a. insert dot into child dir */
3984                 rec->rec_type = S_IFDIR;
3985                 rec->rec_fid = lfsck_dto2fid(child);
3986                 rc = dt_declare_insert(env, child,
3987                                        (const struct dt_rec *)rec,
3988                                        (const struct dt_key *)dot, th);
3989                 if (rc != 0)
3990                         GOTO(stop, rc);
3991
3992                 /* 3a. insert dotdot into child dir */
3993                 rec->rec_fid = lfsck_dto2fid(parent);
3994                 rc = dt_declare_insert(env, child,
3995                                        (const struct dt_rec *)rec,
3996                                        (const struct dt_key *)dotdot, th);
3997                 if (rc != 0)
3998                         GOTO(stop, rc);
3999
4000                 /* 4a. increase child nlink */
4001                 rc = dt_declare_ref_add(env, child, th);
4002                 if (rc != 0)
4003                         GOTO(stop, rc);
4004         }
4005
4006         /* 5a. insert linkEA for child */
4007         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
4008                        ldata.ld_leh->leh_len);
4009         rc = dt_declare_xattr_set(env, child, &linkea_buf,
4010                                   XATTR_NAME_LINK, 0, th);
4011         if (rc != 0)
4012                 GOTO(stop, rc);
4013
4014         rc = dt_trans_start(env, dev, th);
4015         if (rc != 0)
4016                 GOTO(stop, rc = (rc == -EEXIST ? 1 : rc));
4017
4018         dt_write_lock(env, child, 0);
4019         /* 1b. create child */
4020         rc = dt_create(env, child, la, hint, dof, th);
4021         if (rc != 0)
4022                 GOTO(unlock, rc = (rc == -EEXIST ? 1 : rc));
4023
4024         if (S_ISDIR(type)) {
4025                 if (unlikely(!dt_try_as_dir(env, child)))
4026                         GOTO(unlock, rc = -ENOTDIR);
4027
4028                 /* 2b. insert dot into child dir */
4029                 rec->rec_type = S_IFDIR;
4030                 rec->rec_fid = lfsck_dto2fid(child);
4031                 rc = dt_insert(env, child, (const struct dt_rec *)rec,
4032                                (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
4033                 if (rc != 0)
4034                         GOTO(unlock, rc);
4035
4036                 /* 3b. insert dotdot into child dir */
4037                 rec->rec_fid = lfsck_dto2fid(parent);
4038                 rc = dt_insert(env, child, (const struct dt_rec *)rec,
4039                                (const struct dt_key *)dotdot, th,
4040                                BYPASS_CAPA, 1);
4041                 if (rc != 0)
4042                         GOTO(unlock, rc);
4043
4044                 /* 4b. increase child nlink */
4045                 rc = dt_ref_add(env, child, th);
4046                 if (rc != 0)
4047                         GOTO(unlock, rc);
4048         }
4049
4050         /* 5b. insert linkEA for child. */
4051         rc = dt_xattr_set(env, child, &linkea_buf,
4052                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
4053
4054         GOTO(unlock, rc);
4055
4056 unlock:
4057         dt_write_unlock(env, child);
4058
4059 stop:
4060         dt_trans_stop(env, dev, th);
4061
4062 log:
4063         lfsck_ibits_unlock(&lh, LCK_EX);
4064         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found dangling "
4065                "reference for: parent "DFID", child "DFID", type %u, "
4066                "name %s. %s: rc = %d\n", lfsck_lfsck2name(lfsck),
4067                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
4068                type, cname->ln_name,
4069                create ? "Create the lost OST-object as required" :
4070                         "Keep the MDT-object there by default", rc);
4071
4072         if (rc <= 0) {
4073                 struct lfsck_namespace *ns = com->lc_file_ram;
4074
4075                 ns->ln_flags |= LF_INCONSISTENT;
4076         }
4077
4078         return rc;
4079 }
4080
4081 static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
4082                                                 struct lfsck_component *com,
4083                                                 struct lfsck_assistant_req *lar)
4084 {
4085         struct lfsck_thread_info   *info     = lfsck_env_info(env);
4086         struct lu_attr             *la       = &info->lti_la;
4087         struct lfsck_instance      *lfsck    = com->lc_lfsck;
4088         struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
4089         struct lfsck_namespace     *ns       = com->lc_file_ram;
4090         struct linkea_data          ldata    = { 0 };
4091         const struct lu_name       *cname;
4092         struct thandle             *handle   = NULL;
4093         struct lfsck_namespace_req *lnr      =
4094                         container_of0(lar, struct lfsck_namespace_req, lnr_lar);
4095         struct dt_object           *dir      = lnr->lnr_obj;
4096         struct dt_object           *obj      = NULL;
4097         const struct lu_fid        *pfid     = lfsck_dto2fid(dir);
4098         struct dt_device           *dev;
4099         struct lustre_handle        lh       = { 0 };
4100         bool                        repaired = false;
4101         bool                        dtlocked = false;
4102         bool                        remove;
4103         bool                        newdata;
4104         bool                        log      = false;
4105         int                         idx;
4106         int                         count    = 0;
4107         int                         rc;
4108         enum lfsck_namespace_inconsistency_type type = LNIT_NONE;
4109         ENTRY;
4110
4111         if (lnr->lnr_attr & LUDA_UPGRADE) {
4112                 ns->ln_flags |= LF_UPGRADE;
4113                 ns->ln_dirent_repaired++;
4114                 repaired = true;
4115         } else if (lnr->lnr_attr & LUDA_REPAIR) {
4116                 ns->ln_flags |= LF_INCONSISTENT;
4117                 ns->ln_dirent_repaired++;
4118                 repaired = true;
4119         }
4120
4121         if (unlikely(fid_is_zero(&lnr->lnr_fid))) {
4122                 if (strcmp(lnr->lnr_name, dotdot) != 0)
4123                         LBUG();
4124                 else
4125                         rc = lfsck_namespace_trace_update(env, com, pfid,
4126                                                 LNTF_CHECK_PARENT, true);
4127
4128                 GOTO(out, rc);
4129         }
4130
4131         if (lnr->lnr_name[0] == '.' &&
4132             (lnr->lnr_namelen == 1 || fid_seq_is_dot(fid_seq(&lnr->lnr_fid))))
4133                 GOTO(out, rc = 0);
4134
4135         idx = lfsck_find_mdt_idx_by_fid(env, lfsck, &lnr->lnr_fid);
4136         if (idx < 0)
4137                 GOTO(out, rc = idx);
4138
4139         if (idx == lfsck_dev_idx(lfsck->li_bottom)) {
4140                 if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0))
4141                         GOTO(out, rc = 0);
4142
4143                 dev = lfsck->li_next;
4144         } else {
4145                 struct lfsck_tgt_desc *ltd;
4146
4147                 /* Usually, some local filesystem consistency verification
4148                  * tools can guarantee the local namespace tree consistenct.
4149                  * So the LFSCK will only verify the remote directory. */
4150                 if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0)) {
4151                         rc = lfsck_namespace_trace_update(env, com, pfid,
4152                                                 LNTF_CHECK_PARENT, true);
4153
4154                         GOTO(out, rc);
4155                 }
4156
4157                 ltd = LTD_TGT(&lfsck->li_mdt_descs, idx);
4158                 if (unlikely(ltd == NULL)) {
4159                         CDEBUG(D_LFSCK, "%s: cannot talk with MDT %x which "
4160                                "did not join the namespace LFSCK\n",
4161                                lfsck_lfsck2name(lfsck), idx);
4162                         ns->ln_flags |= LF_INCOMPLETE;
4163
4164                         GOTO(out, rc = -ENODEV);
4165                 }
4166
4167                 dev = ltd->ltd_tgt;
4168         }
4169
4170         obj = lfsck_object_find_by_dev(env, dev, &lnr->lnr_fid);
4171         if (IS_ERR(obj))
4172                 GOTO(out, rc = PTR_ERR(obj));
4173
4174         cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen);
4175         if (dt_object_exists(obj) == 0) {
4176
4177 dangling:
4178                 rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
4179                 if (rc == 0) {
4180                         type = LNIT_DANGLING;
4181                         rc = lfsck_namespace_repair_dangling(env, com,
4182                                                              obj, lnr);
4183                         if (rc == 0)
4184                                 repaired = true;
4185                 }
4186
4187                 GOTO(out, rc);
4188         }
4189
4190         if (!(bk->lb_param & LPF_DRYRUN) && repaired) {
4191
4192 again:
4193                 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
4194                                       MDS_INODELOCK_UPDATE |
4195                                       MDS_INODELOCK_XATTR, LCK_EX);
4196                 if (rc != 0)
4197                         GOTO(out, rc);
4198
4199                 handle = dt_trans_create(env, dev);
4200                 if (IS_ERR(handle))
4201                         GOTO(out, rc = PTR_ERR(handle));
4202
4203                 rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
4204                 if (rc != 0)
4205                         GOTO(stop, rc);
4206
4207                 rc = dt_trans_start(env, dev, handle);
4208                 if (rc != 0)
4209                         GOTO(stop, rc);
4210
4211                 dt_write_lock(env, obj, 0);
4212                 dtlocked = true;
4213         }
4214
4215         rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
4216         if (rc != 0)
4217                 GOTO(stop, rc);
4218
4219         rc = lfsck_links_read(env, obj, &ldata);
4220         if (unlikely(rc == -ENOENT)) {
4221                 if (handle != NULL) {
4222                         dt_write_unlock(env, obj);
4223                         dtlocked = false;
4224
4225                         dt_trans_stop(env, dev, handle);
4226                         handle = NULL;
4227
4228                         lfsck_ibits_unlock(&lh, LCK_EX);
4229                 }
4230
4231                 /* It may happen when the remote object has been removed,
4232                  * but the local MDT is not aware of that. */
4233                 goto dangling;
4234         } else if (rc == 0) {
4235                 count = ldata.ld_leh->leh_reccount;
4236                 rc = linkea_links_find(&ldata, cname, pfid);
4237                 if ((rc == 0) &&
4238                     (count == 1 || !S_ISDIR(lfsck_object_type(obj)))) {
4239                         if ((lfsck_object_type(obj) & S_IFMT) !=
4240                             lnr->lnr_type) {
4241                                 ns->ln_flags |= LF_INCONSISTENT;
4242                                 type = LNIT_BAD_TYPE;
4243                         }
4244
4245                         goto record;
4246                 }
4247
4248                 ns->ln_flags |= LF_INCONSISTENT;
4249
4250                 /* If the file type stored in the name entry does not match
4251                  * the file type claimed by the object, and the object does
4252                  * not recognize the name entry, then it is quite possible
4253                  * that the name entry is corrupted. */
4254                 if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type) {
4255                         type = LNIT_BAD_DIRENT;
4256
4257                         GOTO(stop, rc = 0);
4258                 }
4259
4260                 /* For sub-dir object, we cannot make sure whether the sub-dir
4261                  * back references the parent via ".." name entry correctly or
4262                  * not in the LFSCK first-stage scanning. It may be that the
4263                  * (remote) sub-dir ".." name entry has no parent FID after
4264                  * file-level backup/restore and its linkEA may be wrong.
4265                  * So under such case, we should replace the linkEA according
4266                  * to current name entry. But this needs to be done during the
4267                  * LFSCK second-stage scanning. The LFSCK will record the name
4268                  * entry for further possible using. */
4269                 remove = false;
4270                 newdata = false;
4271                 goto nodata;
4272         } else if (unlikely(rc == -EINVAL)) {
4273                 if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type)
4274                         type = LNIT_BAD_TYPE;
4275
4276                 count = 1;
4277                 ns->ln_flags |= LF_INCONSISTENT;
4278                 /* The magic crashed, we are not sure whether there are more
4279                  * corrupt data in the linkea, so remove all linkea entries. */
4280                 remove = true;
4281                 newdata = true;
4282                 goto nodata;
4283         } else if (rc == -ENODATA) {
4284                 if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type)
4285                         type = LNIT_BAD_TYPE;
4286
4287                 count = 1;
4288                 ns->ln_flags |= LF_UPGRADE;
4289                 remove = false;
4290                 newdata = true;
4291
4292 nodata:
4293                 if (bk->lb_param & LPF_DRYRUN) {
4294                         ns->ln_linkea_repaired++;
4295                         repaired = true;
4296                         log = true;
4297                         goto record;
4298                 }
4299
4300                 if (!lustre_handle_is_used(&lh))
4301                         goto again;
4302
4303                 if (remove) {
4304                         LASSERT(newdata);
4305
4306                         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
4307                                           BYPASS_CAPA);
4308                         if (rc != 0)
4309                                 GOTO(stop, rc);
4310                 }
4311
4312                 if (newdata) {
4313                         rc = linkea_data_new(&ldata,
4314                                         &lfsck_env_info(env)->lti_linkea_buf);
4315                         if (rc != 0)
4316                                 GOTO(stop, rc);
4317                 }
4318
4319                 rc = linkea_add_buf(&ldata, cname, pfid);
4320                 if (rc != 0)
4321                         GOTO(stop, rc);
4322
4323                 rc = lfsck_links_write(env, obj, &ldata, handle);
4324                 if (rc != 0)
4325                         GOTO(stop, rc);
4326
4327                 count = ldata.ld_leh->leh_reccount;
4328                 if (!S_ISDIR(lfsck_object_type(obj)) ||
4329                     !dt_object_remote(obj)) {
4330                         ns->ln_linkea_repaired++;
4331                         repaired = true;
4332                         log = true;
4333                 }
4334         } else {
4335                 GOTO(stop, rc);
4336         }
4337
4338 record:
4339         LASSERT(count > 0);
4340
4341         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
4342         if (rc != 0)
4343                 GOTO(stop, rc);
4344
4345         if ((count == 1 && la->la_nlink == 1) ||
4346             S_ISDIR(lfsck_object_type(obj)))
4347                 /* Usually, it is for single linked object or dir, do nothing.*/
4348                 GOTO(stop, rc);
4349
4350         /* Following modification will be in another transaction.  */
4351         if (handle != NULL) {
4352                 dt_write_unlock(env, obj);
4353                 dtlocked = false;
4354
4355                 dt_trans_stop(env, dev, handle);
4356                 handle = NULL;
4357
4358                 lfsck_ibits_unlock(&lh, LCK_EX);
4359         }
4360
4361         ns->ln_mul_linked_checked++;
4362         rc = lfsck_namespace_trace_update(env, com, &lnr->lnr_fid,
4363                                           LNTF_CHECK_LINKEA, true);
4364
4365         GOTO(out, rc);
4366
4367 stop:
4368         if (dtlocked)
4369                 dt_write_unlock(env, obj);
4370
4371         if (handle != NULL && !IS_ERR(handle))
4372                 dt_trans_stop(env, dev, handle);
4373
4374 out:
4375         lfsck_ibits_unlock(&lh, LCK_EX);
4376
4377         if (rc >= 0) {
4378                 switch (type) {
4379                 case LNIT_BAD_TYPE:
4380                         log = false;
4381                         rc = lfsck_namespace_repair_dirent(env, com, dir,
4382                                         obj, lnr->lnr_name, lnr->lnr_name,
4383                                         lnr->lnr_type, true, false);
4384                         if (rc > 0)
4385                                 repaired = true;
4386                         break;
4387                 case LNIT_BAD_DIRENT:
4388                         log = false;
4389                         /* XXX: This is a bad dirent, we do not know whether
4390                          *      the original name entry reference a regular
4391                          *      file or a directory, then keep the parent's
4392                          *      nlink count unchanged here. */
4393                         rc = lfsck_namespace_repair_dirent(env, com, dir,
4394                                         obj, lnr->lnr_name, lnr->lnr_name,
4395                                         lnr->lnr_type, false, false);
4396                         if (rc > 0)
4397                                 repaired = true;
4398                         break;
4399                 default:
4400                         break;
4401                 }
4402         }
4403
4404         down_write(&com->lc_sem);
4405         if (rc < 0) {
4406                 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle "
4407                        "the entry: "DFID", parent "DFID", name %.*s: rc = %d\n",
4408                        lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid),
4409                        PFID(lfsck_dto2fid(lnr->lnr_obj)),
4410                        lnr->lnr_namelen, lnr->lnr_name, rc);
4411
4412                 lfsck_namespace_record_failure(env, lfsck, ns);
4413                 if (!(bk->lb_param & LPF_FAILOUT))
4414                         rc = 0;
4415         } else {
4416                 if (log)
4417                         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant "
4418                                "repaired the entry: "DFID", parent "DFID
4419                                ", name %.*s\n", lfsck_lfsck2name(lfsck),
4420                                PFID(&lnr->lnr_fid),
4421                                PFID(lfsck_dto2fid(lnr->lnr_obj)),
4422                                lnr->lnr_namelen, lnr->lnr_name);
4423
4424                 if (repaired) {
4425                         ns->ln_items_repaired++;
4426
4427                         switch (type) {
4428                         case LNIT_DANGLING:
4429                                 ns->ln_dangling_repaired++;
4430                                 break;
4431                         case LNIT_BAD_TYPE:
4432                                 ns->ln_bad_type_repaired++;
4433                                 break;
4434                         case LNIT_BAD_DIRENT:
4435                                 ns->ln_dirent_repaired++;
4436                                 break;
4437                         default:
4438                                 break;
4439                         }
4440
4441                         if (bk->lb_param & LPF_DRYRUN &&
4442                             lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
4443                                 lfsck_pos_fill(env, lfsck,
4444                                                &ns->ln_pos_first_inconsistent,
4445                                                false);
4446                 }
4447
4448                 rc = 0;
4449         }
4450         up_write(&com->lc_sem);
4451
4452         if (obj != NULL && !IS_ERR(obj))
4453                 lfsck_object_put(env, obj);
4454         return rc;
4455 }
4456
4457 static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env,
4458                                                 struct lfsck_component *com)
4459 {
4460         struct lfsck_instance   *lfsck  = com->lc_lfsck;
4461         struct ptlrpc_thread    *thread = &lfsck->li_thread;
4462         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
4463         struct lfsck_namespace  *ns     = com->lc_file_ram;
4464         struct dt_object        *obj    = com->lc_obj;
4465         const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
4466         struct dt_object        *target;
4467         struct dt_it            *di;
4468         struct dt_key           *key;
4469         struct lu_fid            fid;
4470         int                      rc;
4471         __u8                     flags  = 0;
4472         ENTRY;
4473
4474         CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
4475                lfsck_lfsck2name(lfsck));
4476
4477         com->lc_new_checked = 0;
4478         com->lc_new_scanned = 0;
4479         com->lc_time_last_checkpoint = cfs_time_current();
4480         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
4481                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
4482
4483         di = iops->init(env, obj, 0, BYPASS_CAPA);
4484         if (IS_ERR(di))
4485                 RETURN(PTR_ERR(di));
4486
4487         fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
4488         rc = iops->get(env, di, (const struct dt_key *)&fid);
4489         if (rc < 0)
4490                 GOTO(fini, rc);
4491
4492         /* Skip the start one, which either has been processed or non-exist. */
4493         rc = iops->next(env, di);
4494         if (rc != 0)
4495                 GOTO(put, rc);
4496
4497         do {
4498                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
4499                     cfs_fail_val > 0) {
4500                         struct l_wait_info lwi;
4501
4502                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
4503                                           NULL, NULL);
4504                         l_wait_event(thread->t_ctl_waitq,
4505                                      !thread_is_running(thread),
4506                                      &lwi);
4507
4508                         if (unlikely(!thread_is_running(thread)))
4509                                 GOTO(put, rc = 0);
4510                 }
4511
4512                 key = iops->key(env, di);
4513                 fid_be_to_cpu(&fid, (const struct lu_fid *)key);
4514                 if (!fid_is_sane(&fid)) {
4515                         rc = 0;
4516                         goto checkpoint;
4517                 }
4518
4519                 target = lfsck_object_find(env, lfsck, &fid);
4520                 if (IS_ERR(target)) {
4521                         rc = PTR_ERR(target);
4522                         goto checkpoint;
4523                 }
4524
4525                 if (dt_object_exists(target)) {
4526                         rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
4527                         if (rc == 0) {
4528                                 rc = lfsck_namespace_double_scan_one(env, com,
4529                                                                 target, flags);
4530                                 if (rc == -ENOENT)
4531                                         rc = 0;
4532                         }
4533                 }
4534
4535                 lfsck_object_put(env, target);
4536
4537 checkpoint:
4538                 down_write(&com->lc_sem);
4539                 com->lc_new_checked++;
4540                 com->lc_new_scanned++;
4541                 ns->ln_fid_latest_scanned_phase2 = fid;
4542                 if (rc > 0)
4543                         ns->ln_objs_repaired_phase2++;
4544                 else if (rc < 0)
4545                         ns->ln_objs_failed_phase2++;
4546                 up_write(&com->lc_sem);
4547
4548                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
4549                         GOTO(put, rc);
4550
4551                 if (unlikely(cfs_time_beforeq(com->lc_time_next_checkpoint,
4552                                               cfs_time_current())) &&
4553                     com->lc_new_checked != 0) {
4554                         down_write(&com->lc_sem);
4555                         ns->ln_run_time_phase2 +=
4556                                 cfs_duration_sec(cfs_time_current() +
4557                                 HALF_SEC - com->lc_time_last_checkpoint);
4558                         ns->ln_time_last_checkpoint = cfs_time_current_sec();
4559                         ns->ln_objs_checked_phase2 += com->lc_new_checked;
4560                         com->lc_new_checked = 0;
4561                         rc = lfsck_namespace_store(env, com, false);
4562                         up_write(&com->lc_sem);
4563                         if (rc != 0)
4564                                 GOTO(put, rc);
4565
4566                         com->lc_time_last_checkpoint = cfs_time_current();
4567                         com->lc_time_next_checkpoint =
4568                                 com->lc_time_last_checkpoint +
4569                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
4570                 }
4571
4572                 lfsck_control_speed_by_self(com);
4573                 if (unlikely(!thread_is_running(thread)))
4574                         GOTO(put, rc = 0);
4575
4576                 rc = iops->next(env, di);
4577         } while (rc == 0);
4578
4579         GOTO(put, rc);
4580
4581 put:
4582         iops->put(env, di);
4583
4584 fini:
4585         iops->fini(env, di);
4586
4587         CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan stop: rc = %d\n",
4588                lfsck_lfsck2name(lfsck), rc);
4589
4590         return rc;
4591 }
4592
4593 static void lfsck_namespace_assistant_fill_pos(const struct lu_env *env,
4594                                                struct lfsck_component *com,
4595                                                struct lfsck_position *pos)
4596 {
4597         struct lfsck_assistant_data     *lad = com->lc_data;
4598         struct lfsck_namespace_req      *lnr;
4599
4600         if (list_empty(&lad->lad_req_list))
4601                 return;
4602
4603         lnr = list_entry(lad->lad_req_list.next,
4604                          struct lfsck_namespace_req,
4605                          lnr_lar.lar_list);
4606         pos->lp_oit_cookie = lnr->lnr_oit_cookie;
4607         pos->lp_dir_cookie = lnr->lnr_dir_cookie - 1;
4608         pos->lp_dir_parent = *lfsck_dto2fid(lnr->lnr_obj);
4609 }
4610
4611 static int lfsck_namespace_double_scan_result(const struct lu_env *env,
4612                                               struct lfsck_component *com,
4613                                               int rc)
4614 {
4615         struct lfsck_instance   *lfsck  = com->lc_lfsck;
4616         struct lfsck_namespace  *ns     = com->lc_file_ram;
4617
4618         down_write(&com->lc_sem);
4619         ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
4620                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4621         ns->ln_time_last_checkpoint = cfs_time_current_sec();
4622         ns->ln_objs_checked_phase2 += com->lc_new_checked;
4623         com->lc_new_checked = 0;
4624
4625         if (rc > 0) {
4626                 if (ns->ln_flags & LF_INCOMPLETE)
4627                         ns->ln_status = LS_PARTIAL;
4628                 else
4629                         ns->ln_status = LS_COMPLETED;
4630                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN))
4631                         ns->ln_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
4632                 ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
4633                 ns->ln_success_count++;
4634         } else if (rc == 0) {
4635                 ns->ln_status = lfsck->li_status;
4636                 if (ns->ln_status == 0)
4637                         ns->ln_status = LS_STOPPED;
4638         } else {
4639                 ns->ln_status = LS_FAILED;
4640         }
4641
4642         rc = lfsck_namespace_store(env, com, false);
4643         up_write(&com->lc_sem);
4644
4645         return rc;
4646 }
4647
4648 static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env,
4649                                                     struct lfsck_component *com,
4650                                                     struct lfsck_request *lr)
4651 {
4652         /* XXX: TBD */
4653 }
4654
4655 struct lfsck_assistant_operations lfsck_namespace_assistant_ops = {
4656         .la_handler_p1          = lfsck_namespace_assistant_handler_p1,
4657         .la_handler_p2          = lfsck_namespace_assistant_handler_p2,
4658         .la_fill_pos            = lfsck_namespace_assistant_fill_pos,
4659         .la_double_scan_result  = lfsck_namespace_double_scan_result,
4660         .la_req_fini            = lfsck_namespace_assistant_req_fini,
4661         .la_sync_failures       = lfsck_namespace_assistant_sync_failures,
4662 };
4663
4664 /**
4665  * Verify the specified linkEA entry for the given directory object.
4666  * If the object has no such linkEA entry or it has more other linkEA
4667  * entries, then re-generate the linkEA with the given information.
4668  *
4669  * \param[in] env       pointer to the thread context
4670  * \param[in] dev       pointer to the dt_device
4671  * \param[in] obj       pointer to the dt_object to be handled
4672  * \param[in] cname     the name for the child in the parent directory
4673  * \param[in] pfid      the parent directory's FID for the linkEA
4674  *
4675  * \retval              0 for success
4676  * \retval              negative error number on failure
4677  */
4678 int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev,
4679                         struct dt_object *obj, const struct lu_name *cname,
4680                         const struct lu_fid *pfid)
4681 {
4682         struct linkea_data       ldata  = { 0 };
4683         struct lu_buf            linkea_buf;
4684         struct thandle          *th;
4685         int                      rc;
4686         int                      fl     = LU_XATTR_CREATE;
4687         bool                     dirty  = false;
4688         ENTRY;
4689
4690         LASSERT(S_ISDIR(lfsck_object_type(obj)));
4691
4692         rc = lfsck_links_read(env, obj, &ldata);
4693         if (rc == -ENODATA) {
4694                 dirty = true;
4695         } else if (rc == 0) {
4696                 fl = LU_XATTR_REPLACE;
4697                 if (ldata.ld_leh->leh_reccount != 1) {
4698                         dirty = true;
4699                 } else {
4700                         rc = linkea_links_find(&ldata, cname, pfid);
4701                         if (rc != 0)
4702                                 dirty = true;
4703                 }
4704         }
4705
4706         if (!dirty)
4707                 RETURN(rc);
4708
4709         rc = linkea_data_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf);
4710         if (rc != 0)
4711                 RETURN(rc);
4712
4713         rc = linkea_add_buf(&ldata, cname, pfid);
4714         if (rc != 0)
4715                 RETURN(rc);
4716
4717         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
4718                        ldata.ld_leh->leh_len);
4719         th = dt_trans_create(env, dev);
4720         if (IS_ERR(th))
4721                 RETURN(PTR_ERR(th));
4722
4723         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
4724                                   XATTR_NAME_LINK, fl, th);
4725         if (rc != 0)
4726                 GOTO(stop, rc);
4727
4728         rc = dt_trans_start_local(env, dev, th);
4729         if (rc != 0)
4730                 GOTO(stop, rc);
4731
4732         dt_write_lock(env, obj, 0);
4733         rc = dt_xattr_set(env, obj, &linkea_buf,
4734                           XATTR_NAME_LINK, fl, th, BYPASS_CAPA);
4735         dt_write_unlock(env, obj);
4736
4737         GOTO(stop, rc);
4738
4739 stop:
4740         dt_trans_stop(env, dev, th);
4741         return rc;
4742 }
4743
4744 /**
4745  * Get the name and parent directory's FID from the first linkEA entry.
4746  *
4747  * \param[in] env       pointer to the thread context
4748  * \param[in] obj       pointer to the object which get linkEA from
4749  * \param[out] name     pointer to the buffer to hold the name
4750  *                      in the first linkEA entry
4751  * \param[out] pfid     pointer to the buffer to hold the parent
4752  *                      directory's FID in the first linkEA entry
4753  *
4754  * \retval              0 for success
4755  * \retval              negative error number on failure
4756  */
4757 int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj,
4758                           char *name, struct lu_fid *pfid)
4759 {
4760         struct lu_name           *cname = &lfsck_env_info(env)->lti_name;
4761         struct linkea_data        ldata = { 0 };
4762         int                       rc;
4763
4764         rc = lfsck_links_read(env, obj, &ldata);
4765         if (rc != 0)
4766                 return rc;
4767
4768         linkea_first_entry(&ldata);
4769         if (ldata.ld_lee == NULL)
4770                 return -ENODATA;
4771
4772         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
4773         /* To guarantee the 'name' is terminated with '0'. */
4774         memcpy(name, cname->ln_name, cname->ln_namelen);
4775         name[cname->ln_namelen] = 0;
4776
4777         return 0;
4778 }
4779
4780 /**
4781  * Remove the name entry from the parent directory.
4782  *
4783  * No need to care about the object referenced by the name entry,
4784  * either the name entry is invalid or redundant, or the referenced
4785  * object has been processed has been or will be handled by others.
4786  *
4787  * \param[in] env       pointer to the thread context
4788  * \param[in] lfsck     pointer to the lfsck instance
4789  * \param[in] parent    pointer to the lost+found object
4790  * \param[in] name      the name for the name entry to be removed
4791  * \param[in] type      the type for the name entry to be removed
4792  *
4793  * \retval              0 for success
4794  * \retval              negative error number on failure
4795  */
4796 int lfsck_remove_name_entry(const struct lu_env *env,
4797                             struct lfsck_instance *lfsck,
4798                             struct dt_object *parent,
4799                             const char *name, __u32 type)
4800 {
4801         struct dt_device        *dev    = lfsck->li_next;
4802         struct thandle          *th;
4803         struct lustre_handle     lh     = { 0 };
4804         int                      rc;
4805         ENTRY;
4806
4807         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
4808                               MDS_INODELOCK_UPDATE, LCK_EX);
4809         if (rc != 0)
4810                 RETURN(rc);
4811
4812         th = dt_trans_create(env, dev);
4813         if (IS_ERR(th))
4814                 GOTO(unlock, rc = PTR_ERR(th));
4815
4816         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
4817         if (rc != 0)
4818                 GOTO(stop, rc);
4819
4820         if (S_ISDIR(type)) {
4821                 rc = dt_declare_ref_del(env, parent, th);
4822                 if (rc != 0)
4823                         GOTO(stop, rc);
4824         }
4825
4826         rc = dt_trans_start(env, dev, th);
4827         if (rc != 0)
4828                 GOTO(stop, rc);
4829
4830         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
4831                        BYPASS_CAPA);
4832         if (rc != 0)
4833                 GOTO(stop, rc);
4834
4835         if (S_ISDIR(type)) {
4836                 dt_write_lock(env, parent, 0);
4837                 rc = dt_ref_del(env, parent, th);
4838                 dt_write_unlock(env, parent);
4839         }
4840
4841         GOTO(stop, rc);
4842
4843 stop:
4844         dt_trans_stop(env, dev, th);
4845
4846 unlock:
4847         lfsck_ibits_unlock(&lh, LCK_EX);
4848
4849         CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s "
4850                "with type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
4851                PFID(lfsck_dto2fid(parent)), name, type, rc);
4852
4853         return rc;
4854 }
4855
4856 /**
4857  * Update the object's name entry with the given FID.
4858  *
4859  * \param[in] env       pointer to the thread context
4860  * \param[in] lfsck     pointer to the lfsck instance
4861  * \param[in] parent    pointer to the parent directory that holds
4862  *                      the name entry
4863  * \param[in] name      the name for the entry to be updated
4864  * \param[in] pfid      the new PFID for the name entry
4865  * \param[in] type      the type for the name entry to be updated
4866  *
4867  * \retval              0 for success
4868  * \retval              negative error number on failure
4869  */
4870 int lfsck_update_name_entry(const struct lu_env *env,
4871                             struct lfsck_instance *lfsck,
4872                             struct dt_object *parent, const char *name,
4873                             const struct lu_fid *pfid, __u32 type)
4874 {
4875         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
4876         struct dt_device        *dev    = lfsck->li_next;
4877         struct lustre_handle     lh     = { 0 };
4878         struct thandle          *th;
4879         int                      rc;
4880         bool                     exists = true;
4881         ENTRY;
4882
4883         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
4884                               MDS_INODELOCK_UPDATE, LCK_EX);
4885         if (rc != 0)
4886                 RETURN(rc);
4887
4888         th = dt_trans_create(env, dev);
4889         if (IS_ERR(th))
4890                 GOTO(unlock, rc = PTR_ERR(th));
4891
4892         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
4893         if (rc != 0)
4894                 GOTO(stop, rc);
4895
4896         rec->rec_type = type;
4897         rec->rec_fid = pfid;
4898         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
4899                                (const struct dt_key *)name, th);
4900         if (rc != 0)
4901                 GOTO(stop, rc);
4902
4903         rc = dt_declare_ref_add(env, parent, th);
4904         if (rc != 0)
4905                 GOTO(stop, rc);
4906
4907         rc = dt_trans_start(env, dev, th);
4908         if (rc != 0)
4909                 GOTO(stop, rc);
4910
4911         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
4912                        BYPASS_CAPA);
4913         if (rc == -ENOENT) {
4914                 exists = false;
4915                 rc = 0;
4916         }
4917
4918         if (rc != 0)
4919                 GOTO(stop, rc);
4920
4921         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
4922                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
4923         if (rc == 0 && S_ISDIR(type) && !exists) {
4924                 dt_write_lock(env, parent, 0);
4925                 rc = dt_ref_add(env, parent, th);
4926                 dt_write_unlock(env, parent);
4927         }
4928
4929         GOTO(stop, rc);
4930
4931 stop:
4932         dt_trans_stop(env, dev, th);
4933
4934 unlock:
4935         lfsck_ibits_unlock(&lh, LCK_EX);
4936
4937         CDEBUG(D_LFSCK, "%s: update name entry "DFID"/%s with the FID "DFID
4938                " and the type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
4939                PFID(lfsck_dto2fid(parent)), name, PFID(pfid), type, rc);
4940
4941         return rc;
4942 }
4943
4944 int lfsck_namespace_setup(const struct lu_env *env,
4945                           struct lfsck_instance *lfsck)
4946 {
4947         struct lfsck_component  *com;
4948         struct lfsck_namespace  *ns;
4949         struct dt_object        *root = NULL;
4950         struct dt_object        *obj;
4951         int                      rc;
4952         ENTRY;
4953
4954         LASSERT(lfsck->li_master);
4955
4956         OBD_ALLOC_PTR(com);
4957         if (com == NULL)
4958                 RETURN(-ENOMEM);
4959
4960         INIT_LIST_HEAD(&com->lc_link);
4961         INIT_LIST_HEAD(&com->lc_link_dir);
4962         init_rwsem(&com->lc_sem);
4963         atomic_set(&com->lc_ref, 1);
4964         com->lc_lfsck = lfsck;
4965         com->lc_type = LFSCK_TYPE_NAMESPACE;
4966         com->lc_ops = &lfsck_namespace_ops;
4967         com->lc_data = lfsck_assistant_data_init(
4968                         &lfsck_namespace_assistant_ops,
4969                         "lfsck_namespace");
4970         if (com->lc_data == NULL)
4971                 GOTO(out, rc = -ENOMEM);
4972
4973         com->lc_file_size = sizeof(struct lfsck_namespace);
4974         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
4975         if (com->lc_file_ram == NULL)
4976                 GOTO(out, rc = -ENOMEM);
4977
4978         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
4979         if (com->lc_file_disk == NULL)
4980                 GOTO(out, rc = -ENOMEM);
4981
4982         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
4983         if (IS_ERR(root))
4984                 GOTO(out, rc = PTR_ERR(root));
4985
4986         if (unlikely(!dt_try_as_dir(env, root)))
4987                 GOTO(out, rc = -ENOTDIR);
4988
4989         obj = local_index_find_or_create(env, lfsck->li_los, root,
4990                                          lfsck_namespace_name,
4991                                          S_IFREG | S_IRUGO | S_IWUSR,
4992                                          &dt_lfsck_features);
4993         if (IS_ERR(obj))
4994                 GOTO(out, rc = PTR_ERR(obj));
4995
4996         com->lc_obj = obj;
4997         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
4998         if (rc != 0)
4999                 GOTO(out, rc);
5000
5001         rc = lfsck_namespace_load(env, com);
5002         if (rc > 0)
5003                 rc = lfsck_namespace_reset(env, com, true);
5004         else if (rc == -ENODATA)
5005                 rc = lfsck_namespace_init(env, com);
5006         if (rc != 0)
5007                 GOTO(out, rc);
5008
5009         ns = com->lc_file_ram;
5010         switch (ns->ln_status) {
5011         case LS_INIT:
5012         case LS_COMPLETED:
5013         case LS_FAILED:
5014         case LS_STOPPED:
5015                 spin_lock(&lfsck->li_lock);
5016                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
5017                 spin_unlock(&lfsck->li_lock);
5018                 break;
5019         default:
5020                 CERROR("%s: unknown lfsck_namespace status %d\n",
5021                        lfsck_lfsck2name(lfsck), ns->ln_status);
5022                 /* fall through */
5023         case LS_SCANNING_PHASE1:
5024         case LS_SCANNING_PHASE2:
5025                 /* No need to store the status to disk right now.
5026                  * If the system crashed before the status stored,
5027                  * it will be loaded back when next time. */
5028                 ns->ln_status = LS_CRASHED;
5029                 /* fall through */
5030         case LS_PAUSED:
5031         case LS_CRASHED:
5032                 spin_lock(&lfsck->li_lock);
5033                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
5034                 list_add_tail(&com->lc_link_dir, &lfsck->li_list_dir);
5035                 spin_unlock(&lfsck->li_lock);
5036                 break;
5037         }
5038
5039         GOTO(out, rc = 0);
5040
5041 out:
5042         if (root != NULL && !IS_ERR(root))
5043                 lu_object_put(env, &root->do_lu);
5044         if (rc != 0) {
5045                 lfsck_component_cleanup(env, com);
5046                 CERROR("%s: fail to init namespace LFSCK component: rc = %d\n",
5047                        lfsck_lfsck2name(lfsck), rc);
5048         }
5049         return rc;
5050 }