Whamcloud - gitweb
LU-5511 lfsck: repair unmatched parent-child pairs
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_namespace.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lustre/lustre_idl.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fid.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre/lustre_user.h>
41
42 #include "lfsck_internal.h"
43
44 #define LFSCK_NAMESPACE_MAGIC   0xA0629D03
45
46 enum lfsck_nameentry_check {
47         LFSCK_NAMEENTRY_DEAD            = 1, /* The object has been unlinked. */
48         LFSCK_NAMEENTRY_REMOVED         = 2, /* The entry has been removed. */
49         LFSCK_NAMEENTRY_RECREATED       = 3, /* The entry has been recreated. */
50 };
51
52 static const char lfsck_namespace_name[] = "lfsck_namespace";
53
54 struct lfsck_namespace_req {
55         struct lfsck_assistant_req       lnr_lar;
56         struct dt_object                *lnr_obj;
57         struct lu_fid                    lnr_fid;
58         __u64                            lnr_oit_cookie;
59         __u64                            lnr_dir_cookie;
60         __u32                            lnr_attr;
61         __u32                            lnr_size;
62         __u16                            lnr_type;
63         __u16                            lnr_namelen;
64         char                             lnr_name[0];
65 };
66
67 static struct lfsck_namespace_req *
68 lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck,
69                                    struct lu_dirent *ent, __u16 type)
70 {
71         struct lfsck_namespace_req *lnr;
72         int                         size;
73
74         size = sizeof(*lnr) + (ent->lde_namelen & ~3) + 4;
75         OBD_ALLOC(lnr, size);
76         if (lnr == NULL)
77                 return ERR_PTR(-ENOMEM);
78
79         INIT_LIST_HEAD(&lnr->lnr_lar.lar_list);
80         lu_object_get(&lfsck->li_obj_dir->do_lu);
81         lnr->lnr_obj = lfsck->li_obj_dir;
82         lnr->lnr_fid = ent->lde_fid;
83         lnr->lnr_oit_cookie = lfsck->li_pos_current.lp_oit_cookie;
84         lnr->lnr_dir_cookie = ent->lde_hash;
85         lnr->lnr_attr = ent->lde_attrs;
86         lnr->lnr_size = size;
87         lnr->lnr_type = type;
88         lnr->lnr_namelen = ent->lde_namelen;
89         memcpy(lnr->lnr_name, ent->lde_name, ent->lde_namelen);
90
91         return lnr;
92 }
93
94 static void lfsck_namespace_assistant_req_fini(const struct lu_env *env,
95                                                struct lfsck_assistant_req *lar)
96 {
97         struct lfsck_namespace_req *lnr =
98                         container_of0(lar, struct lfsck_namespace_req, lnr_lar);
99
100         lu_object_put(env, &lnr->lnr_obj->do_lu);
101         OBD_FREE(lnr, lnr->lnr_size);
102 }
103
104 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
105                                       struct lfsck_namespace *src)
106 {
107         dst->ln_magic = le32_to_cpu(src->ln_magic);
108         dst->ln_status = le32_to_cpu(src->ln_status);
109         dst->ln_flags = le32_to_cpu(src->ln_flags);
110         dst->ln_success_count = le32_to_cpu(src->ln_success_count);
111         dst->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
112         dst->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
113         dst->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
114         dst->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
115         dst->ln_time_last_checkpoint =
116                                 le64_to_cpu(src->ln_time_last_checkpoint);
117         lfsck_position_le_to_cpu(&dst->ln_pos_latest_start,
118                                  &src->ln_pos_latest_start);
119         lfsck_position_le_to_cpu(&dst->ln_pos_last_checkpoint,
120                                  &src->ln_pos_last_checkpoint);
121         lfsck_position_le_to_cpu(&dst->ln_pos_first_inconsistent,
122                                  &src->ln_pos_first_inconsistent);
123         dst->ln_items_checked = le64_to_cpu(src->ln_items_checked);
124         dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
125         dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
126         dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
127         dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
128         dst->ln_objs_repaired_phase2 =
129                                 le64_to_cpu(src->ln_objs_repaired_phase2);
130         dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
131         dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
132         dst->ln_objs_lost_found = le64_to_cpu(src->ln_objs_lost_found);
133         fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2,
134                       &src->ln_fid_latest_scanned_phase2);
135         dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
136         dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
137         dst->ln_mul_linked_checked = le64_to_cpu(src->ln_mul_linked_checked);
138         dst->ln_mul_linked_repaired = le64_to_cpu(src->ln_mul_linked_repaired);
139         dst->ln_unknown_inconsistency =
140                                 le64_to_cpu(src->ln_unknown_inconsistency);
141         dst->ln_unmatched_pairs_repaired =
142                                 le64_to_cpu(src->ln_unmatched_pairs_repaired);
143 }
144
145 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
146                                       struct lfsck_namespace *src)
147 {
148         dst->ln_magic = cpu_to_le32(src->ln_magic);
149         dst->ln_status = cpu_to_le32(src->ln_status);
150         dst->ln_flags = cpu_to_le32(src->ln_flags);
151         dst->ln_success_count = cpu_to_le32(src->ln_success_count);
152         dst->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
153         dst->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
154         dst->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
155         dst->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
156         dst->ln_time_last_checkpoint =
157                                 cpu_to_le64(src->ln_time_last_checkpoint);
158         lfsck_position_cpu_to_le(&dst->ln_pos_latest_start,
159                                  &src->ln_pos_latest_start);
160         lfsck_position_cpu_to_le(&dst->ln_pos_last_checkpoint,
161                                  &src->ln_pos_last_checkpoint);
162         lfsck_position_cpu_to_le(&dst->ln_pos_first_inconsistent,
163                                  &src->ln_pos_first_inconsistent);
164         dst->ln_items_checked = cpu_to_le64(src->ln_items_checked);
165         dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
166         dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
167         dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
168         dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
169         dst->ln_objs_repaired_phase2 =
170                                 cpu_to_le64(src->ln_objs_repaired_phase2);
171         dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
172         dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
173         dst->ln_objs_lost_found = cpu_to_le64(src->ln_objs_lost_found);
174         fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2,
175                       &src->ln_fid_latest_scanned_phase2);
176         dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
177         dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
178         dst->ln_mul_linked_checked = cpu_to_le64(src->ln_mul_linked_checked);
179         dst->ln_mul_linked_repaired = cpu_to_le64(src->ln_mul_linked_repaired);
180         dst->ln_unknown_inconsistency =
181                                 cpu_to_le64(src->ln_unknown_inconsistency);
182         dst->ln_unmatched_pairs_repaired =
183                                 cpu_to_le64(src->ln_unmatched_pairs_repaired);
184 }
185
186 static void lfsck_namespace_record_failure(const struct lu_env *env,
187                                            struct lfsck_instance *lfsck,
188                                            struct lfsck_namespace *ns)
189 {
190         struct lfsck_position pos;
191
192         ns->ln_items_failed++;
193         lfsck_pos_fill(env, lfsck, &pos, false);
194         if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent) ||
195             lfsck_pos_is_eq(&pos, &ns->ln_pos_first_inconsistent) < 0) {
196                 ns->ln_pos_first_inconsistent = pos;
197
198                 CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
199                        "inconsistency at the pos ["LPU64", "DFID", "LPX64"]\n",
200                        lfsck_lfsck2name(lfsck),
201                        ns->ln_pos_first_inconsistent.lp_oit_cookie,
202                        PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
203                        ns->ln_pos_first_inconsistent.lp_dir_cookie);
204         }
205 }
206
207 /**
208  * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
209  * \retval 0: succeed.
210  * \retval -ve: failed cases.
211  */
212 static int lfsck_namespace_load(const struct lu_env *env,
213                                 struct lfsck_component *com)
214 {
215         int len = com->lc_file_size;
216         int rc;
217
218         rc = dt_xattr_get(env, com->lc_obj,
219                           lfsck_buf_get(env, com->lc_file_disk, len),
220                           XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
221         if (rc == len) {
222                 struct lfsck_namespace *ns = com->lc_file_ram;
223
224                 lfsck_namespace_le_to_cpu(ns,
225                                 (struct lfsck_namespace *)com->lc_file_disk);
226                 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
227                         CDEBUG(D_LFSCK, "%s: invalid lfsck_namespace magic "
228                                "%#x != %#x\n", lfsck_lfsck2name(com->lc_lfsck),
229                                ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
230                         rc = 1;
231                 } else {
232                         rc = 0;
233                 }
234         } else if (rc != -ENODATA) {
235                 CDEBUG(D_LFSCK, "%s: fail to load lfsck_namespace, "
236                        "expected = %d: rc = %d\n",
237                        lfsck_lfsck2name(com->lc_lfsck), len, rc);
238                 if (rc >= 0)
239                         rc = 1;
240         }
241         return rc;
242 }
243
244 static int lfsck_namespace_store(const struct lu_env *env,
245                                  struct lfsck_component *com, bool init)
246 {
247         struct dt_object        *obj    = com->lc_obj;
248         struct lfsck_instance   *lfsck  = com->lc_lfsck;
249         struct thandle          *handle;
250         int                      len    = com->lc_file_size;
251         int                      rc;
252         ENTRY;
253
254         lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
255                                   (struct lfsck_namespace *)com->lc_file_ram);
256         handle = dt_trans_create(env, lfsck->li_bottom);
257         if (IS_ERR(handle))
258                 GOTO(log, rc = PTR_ERR(handle));
259
260         rc = dt_declare_xattr_set(env, obj,
261                                   lfsck_buf_get(env, com->lc_file_disk, len),
262                                   XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
263         if (rc != 0)
264                 GOTO(out, rc);
265
266         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
267         if (rc != 0)
268                 GOTO(out, rc);
269
270         rc = dt_xattr_set(env, obj,
271                           lfsck_buf_get(env, com->lc_file_disk, len),
272                           XATTR_NAME_LFSCK_NAMESPACE,
273                           init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
274                           handle, BYPASS_CAPA);
275
276         GOTO(out, rc);
277
278 out:
279         dt_trans_stop(env, lfsck->li_bottom, handle);
280
281 log:
282         if (rc != 0)
283                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_namespace: rc = %d\n",
284                        lfsck_lfsck2name(lfsck), rc);
285         return rc;
286 }
287
288 static int lfsck_namespace_init(const struct lu_env *env,
289                                 struct lfsck_component *com)
290 {
291         struct lfsck_namespace *ns = com->lc_file_ram;
292         int rc;
293
294         memset(ns, 0, sizeof(*ns));
295         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
296         ns->ln_status = LS_INIT;
297         down_write(&com->lc_sem);
298         rc = lfsck_namespace_store(env, com, true);
299         up_write(&com->lc_sem);
300         return rc;
301 }
302
303 /**
304  * Update the namespace LFSCK tracing file for the given @fid
305  *
306  * \param[in] env       pointer to the thread context
307  * \param[in] com       pointer to the lfsck component
308  * \param[in] fid       the fid which flags to be updated in the lfsck
309  *                      tracing file
310  * \param[in] add       true if add new flags, otherwise remove flags
311  *
312  * \retval              0 for succeed or nothing to be done
313  * \retval              negative error number on failure
314  */
315 int lfsck_namespace_trace_update(const struct lu_env *env,
316                                  struct lfsck_component *com,
317                                  const struct lu_fid *fid,
318                                  const __u8 flags, bool add)
319 {
320         struct lfsck_instance   *lfsck  = com->lc_lfsck;
321         struct dt_object        *obj    = com->lc_obj;
322         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid3;
323         struct dt_device        *dev    = lfsck->li_bottom;
324         struct thandle          *th     = NULL;
325         int                      rc     = 0;
326         __u8                     old    = 0;
327         __u8                     new    = 0;
328         ENTRY;
329
330         LASSERT(flags != 0);
331
332         down_write(&com->lc_sem);
333         fid_cpu_to_be(key, fid);
334         rc = dt_lookup(env, obj, (struct dt_rec *)&old,
335                        (const struct dt_key *)key, BYPASS_CAPA);
336         if (rc == -ENOENT) {
337                 if (!add)
338                         GOTO(unlock, rc = 0);
339
340                 old = 0;
341                 new = flags;
342         } else if (rc == 0) {
343                 if (add) {
344                         if ((old & flags) == flags)
345                                 GOTO(unlock, rc = 0);
346
347                         new = old | flags;
348                 } else {
349                         if ((old & flags) == 0)
350                                 GOTO(unlock, rc = 0);
351
352                         new = old & ~flags;
353                 }
354         } else {
355                 GOTO(log, rc);
356         }
357
358         th = dt_trans_create(env, dev);
359         if (IS_ERR(th))
360                 GOTO(log, rc = PTR_ERR(th));
361
362         if (old != 0) {
363                 rc = dt_declare_delete(env, obj,
364                                        (const struct dt_key *)key, th);
365                 if (rc != 0)
366                         GOTO(log, rc);
367         }
368
369         if (new != 0) {
370                 rc = dt_declare_insert(env, obj,
371                                        (const struct dt_rec *)&new,
372                                        (const struct dt_key *)key, th);
373                 if (rc != 0)
374                         GOTO(log, rc);
375         }
376
377         rc = dt_trans_start_local(env, dev, th);
378         if (rc != 0)
379                 GOTO(log, rc);
380
381         if (old != 0) {
382                 rc = dt_delete(env, obj, (const struct dt_key *)key,
383                                th, BYPASS_CAPA);
384                 if (rc != 0)
385                         GOTO(log, rc);
386         }
387
388         if (new != 0) {
389                 rc = dt_insert(env, obj, (const struct dt_rec *)&new,
390                                (const struct dt_key *)key, th, BYPASS_CAPA, 1);
391                 if (rc != 0)
392                         GOTO(log, rc);
393         }
394
395         GOTO(log, rc);
396
397 log:
398         if (th != NULL && !IS_ERR(th))
399                 dt_trans_stop(env, dev, th);
400
401         CDEBUG(D_LFSCK, "%s: namespace LFSCK %s flags for "DFID" in the "
402                "tracing file, flags %x, old %x, new %x: rc = %d\n",
403                lfsck_lfsck2name(lfsck), add ? "add" : "del", PFID(fid),
404                (__u32)flags, (__u32)old, (__u32)new, rc);
405
406 unlock:
407         up_write(&com->lc_sem);
408
409         return rc;
410 }
411
412 static int lfsck_namespace_check_exist(const struct lu_env *env,
413                                        struct dt_object *dir,
414                                        struct dt_object *obj, const char *name)
415 {
416         struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
417         int               rc;
418         ENTRY;
419
420         if (unlikely(lfsck_is_dead_obj(obj)))
421                 RETURN(LFSCK_NAMEENTRY_DEAD);
422
423         rc = dt_lookup(env, dir, (struct dt_rec *)fid,
424                        (const struct dt_key *)name, BYPASS_CAPA);
425         if (rc == -ENOENT)
426                 RETURN(LFSCK_NAMEENTRY_REMOVED);
427
428         if (rc < 0)
429                 RETURN(rc);
430
431         if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
432                 RETURN(LFSCK_NAMEENTRY_RECREATED);
433
434         RETURN(0);
435 }
436
437 static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
438                                             struct dt_object *obj,
439                                             struct thandle *handle)
440 {
441         int rc;
442
443         /* For destroying all invalid linkEA entries. */
444         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
445         if (rc != 0)
446                 return rc;
447
448         /* For insert new linkEA entry. */
449         rc = dt_declare_xattr_set(env, obj,
450                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
451                         XATTR_NAME_LINK, 0, handle);
452         return rc;
453 }
454
455 int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
456                        struct linkea_data *ldata)
457 {
458         int rc;
459
460         if (ldata->ld_buf->lb_buf == NULL)
461                 return -ENOMEM;
462
463         if (!dt_object_exists(obj))
464                 return -ENOENT;
465
466         rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
467         if (rc == -ERANGE) {
468                 /* Buf was too small, figure out what we need. */
469                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LINK,
470                                   BYPASS_CAPA);
471                 if (rc <= 0)
472                         return rc;
473
474                 lu_buf_realloc(ldata->ld_buf, rc);
475                 if (ldata->ld_buf->lb_buf == NULL)
476                         return -ENOMEM;
477
478                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
479                                   BYPASS_CAPA);
480         }
481
482         if (rc > 0)
483                 rc = linkea_init(ldata);
484
485         return rc;
486 }
487
488 /**
489  * Remove linkEA for the given object.
490  *
491  * The caller should take the ldlm lock before the calling.
492  *
493  * \param[in] env       pointer to the thread context
494  * \param[in] com       pointer to the lfsck component
495  * \param[in] obj       pointer to the dt_object to be handled
496  *
497  * \retval              0 for repaired cases
498  * \retval              negative error number on failure
499  */
500 static int lfsck_namespace_links_remove(const struct lu_env *env,
501                                         struct lfsck_component *com,
502                                         struct dt_object *obj)
503 {
504         struct lfsck_instance           *lfsck  = com->lc_lfsck;
505         struct dt_device                *dev    = lfsck->li_bottom;
506         struct thandle                  *th     = NULL;
507         int                              rc     = 0;
508         ENTRY;
509
510         LASSERT(dt_object_remote(obj) == 0);
511
512         th = dt_trans_create(env, dev);
513         if (IS_ERR(th))
514                 GOTO(log, rc = PTR_ERR(th));
515
516         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, th);
517         if (rc != 0)
518                 GOTO(stop, rc);
519
520         rc = dt_trans_start_local(env, dev, th);
521         if (rc != 0)
522                 GOTO(stop, rc);
523
524         dt_write_lock(env, obj, 0);
525         if (unlikely(lfsck_is_dead_obj(obj)))
526                 GOTO(unlock, rc = -ENOENT);
527
528         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
529                 GOTO(unlock, rc = 0);
530
531         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th, BYPASS_CAPA);
532
533         GOTO(unlock, rc);
534
535 unlock:
536         dt_write_unlock(env, obj);
537
538 stop:
539         dt_trans_stop(env, dev, th);
540
541 log:
542         CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
543                "for the object "DFID": rc = %d\n",
544                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
545
546         if (rc == 0) {
547                 struct lfsck_namespace *ns = com->lc_file_ram;
548
549                 ns->ln_flags |= LF_INCONSISTENT;
550         }
551
552         return rc;
553 }
554
555 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
556                              struct linkea_data *ldata, struct thandle *handle)
557 {
558         const struct lu_buf *buf = lfsck_buf_get_const(env,
559                                                        ldata->ld_buf->lb_buf,
560                                                        ldata->ld_leh->leh_len);
561
562         return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle,
563                             BYPASS_CAPA);
564 }
565
566 static void lfsck_namespace_unpack_linkea_entry(struct linkea_data *ldata,
567                                                 struct lu_name *cname,
568                                                 struct lu_fid *pfid,
569                                                 char *buf)
570 {
571         linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
572         /* To guarantee the 'name' is terminated with '0'. */
573         memcpy(buf, cname->ln_name, cname->ln_namelen);
574         buf[cname->ln_namelen] = 0;
575         cname->ln_name = buf;
576 }
577
578 static int lfsck_namespace_filter_linkea_entry(struct linkea_data *ldata,
579                                                struct lu_name *cname,
580                                                struct lu_fid *pfid,
581                                                bool remove)
582 {
583         struct link_ea_entry    *oldlee;
584         int                      oldlen;
585         int                      repeated = 0;
586
587         oldlee = ldata->ld_lee;
588         oldlen = ldata->ld_reclen;
589         linkea_next_entry(ldata);
590         while (ldata->ld_lee != NULL) {
591                 ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
592                                    ldata->ld_lee->lee_reclen[1];
593                 if (unlikely(ldata->ld_reclen == oldlen &&
594                              memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
595                         repeated++;
596                         if (!remove)
597                                 break;
598
599                         linkea_del_buf(ldata, cname);
600                 } else {
601                         linkea_next_entry(ldata);
602                 }
603         }
604         ldata->ld_lee = oldlee;
605         ldata->ld_reclen = oldlen;
606
607         return repeated;
608 }
609
610 static int lfsck_namespace_insert_orphan(const struct lu_env *env,
611                                          struct lfsck_component *com,
612                                          struct dt_object *orphan,
613                                          const char *infix, const char *type,
614                                          int *count)
615 {
616         /* XXX: TBD */
617         return 0;
618 }
619
620 static int lfsck_namespace_insert_normal(const struct lu_env *env,
621                                          struct lfsck_component *com,
622                                          struct dt_object *parent,
623                                          struct dt_object *child,
624                                          const char *name)
625 {
626         /* XXX: TBD */
627         return 0;
628 }
629
630 static int lfsck_namespace_create_orphan(const struct lu_env *env,
631                                          struct lfsck_component *com,
632                                          struct dt_object *orphan)
633 {
634         /* XXX: TBD */
635         return 0;
636 }
637
638 /**
639  * Remove the specified entry from the linkEA.
640  *
641  * Locate the linkEA entry with the given @cname and @pfid, then
642  * remove this entry or the other entries those are repeated with
643  * this entry.
644  *
645  * \param[in] env       pointer to the thread context
646  * \param[in] com       pointer to the lfsck component
647  * \param[in] obj       pointer to the dt_object to be handled
648  * \param[in,out]ldata  pointer to the buffer that holds the linkEA
649  * \param[in] cname     the name for the child in the parent directory
650  * \param[in] pfid      the parent directory's FID for the linkEA
651  * \param[in] next      if true, then remove the first found linkEA
652  *                      entry, and move the ldata->ld_lee to next entry
653  *
654  * \retval              positive number for repaired cases
655  * \retval              0 if nothing to be repaired
656  * \retval              negative error number on failure
657  */
658 static int lfsck_namespace_shrink_linkea(const struct lu_env *env,
659                                          struct lfsck_component *com,
660                                          struct dt_object *obj,
661                                          struct linkea_data *ldata,
662                                          struct lu_name *cname,
663                                          struct lu_fid *pfid,
664                                          bool next)
665 {
666         struct lfsck_instance           *lfsck     = com->lc_lfsck;
667         struct dt_device                *dev       = lfsck->li_bottom;
668         struct lfsck_bookmark           *bk        = &lfsck->li_bookmark_ram;
669         struct thandle                  *th        = NULL;
670         struct lustre_handle             lh        = { 0 };
671         struct linkea_data               ldata_new = { 0 };
672         struct lu_buf                    linkea_buf;
673         int                              rc        = 0;
674         ENTRY;
675
676         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
677                               MDS_INODELOCK_UPDATE |
678                               MDS_INODELOCK_XATTR, LCK_EX);
679         if (rc != 0)
680                 GOTO(log, rc);
681
682         if (next)
683                 linkea_del_buf(ldata, cname);
684         else
685                 lfsck_namespace_filter_linkea_entry(ldata, cname, pfid,
686                                                     true);
687         lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
688                        ldata->ld_leh->leh_len);
689
690 again:
691         th = dt_trans_create(env, dev);
692         if (IS_ERR(th))
693                 GOTO(unlock1, rc = PTR_ERR(th));
694
695         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
696                                   XATTR_NAME_LINK, 0, th);
697         if (rc != 0)
698                 GOTO(stop, rc);
699
700         rc = dt_trans_start_local(env, dev, th);
701         if (rc != 0)
702                 GOTO(stop, rc);
703
704         dt_write_lock(env, obj, 0);
705         if (unlikely(lfsck_is_dead_obj(obj)))
706                 GOTO(unlock2, rc = -ENOENT);
707
708         rc = lfsck_links_read2(env, obj, &ldata_new);
709         if (rc != 0)
710                 GOTO(unlock2, rc);
711
712         /* The specified linkEA entry has been removed by race. */
713         rc = linkea_links_find(&ldata_new, cname, pfid);
714         if (rc != 0)
715                 GOTO(unlock2, rc = 0);
716
717         if (bk->lb_param & LPF_DRYRUN)
718                 GOTO(unlock2, rc = 1);
719
720         if (next)
721                 linkea_del_buf(&ldata_new, cname);
722         else
723                 lfsck_namespace_filter_linkea_entry(&ldata_new, cname, pfid,
724                                                     true);
725
726         if (linkea_buf.lb_len < ldata_new.ld_leh->leh_len) {
727                 dt_write_unlock(env, obj);
728                 dt_trans_stop(env, dev, th);
729                 lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
730                                ldata_new.ld_leh->leh_len);
731                 goto again;
732         }
733
734         lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
735                        ldata_new.ld_leh->leh_len);
736         rc = dt_xattr_set(env, obj, &linkea_buf,
737                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
738
739         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
740
741 unlock2:
742         dt_write_unlock(env, obj);
743
744 stop:
745         dt_trans_stop(env, dev, th);
746
747 unlock1:
748         lfsck_ibits_unlock(&lh, LCK_EX);
749
750 log:
751         CDEBUG(D_LFSCK, "%s: namespace LFSCK remove %s linkEA entry "
752                "for the object: "DFID", parent "DFID", name %.*s\n",
753                lfsck_lfsck2name(lfsck), next ? "invalid" : "redundant",
754                PFID(lfsck_dto2fid(obj)), PFID(pfid), cname->ln_namelen,
755                cname->ln_name);
756
757         if (rc != 0) {
758                 struct lfsck_namespace *ns = com->lc_file_ram;
759
760                 ns->ln_flags |= LF_INCONSISTENT;
761         }
762
763         return rc;
764 }
765
766 /**
767  * Conditionally remove the specified entry from the linkEA.
768  *
769  * Take the parent lock firstly, then check whether the specified
770  * name entry exists or not: if yes, do nothing; otherwise, call
771  * lfsck_namespace_shrink_linkea() to remove the linkea entry.
772  *
773  * \param[in] env       pointer to the thread context
774  * \param[in] com       pointer to the lfsck component
775  * \param[in] parent    pointer to the parent directory
776  * \param[in] child     pointer to the child object that holds the linkEA
777  * \param[in,out]ldata  pointer to the buffer that holds the linkEA
778  * \param[in] cname     the name for the child in the parent directory
779  * \param[in] pfid      the parent directory's FID for the linkEA
780  *
781  * \retval              positive number for repaired cases
782  * \retval              0 if nothing to be repaired
783  * \retval              negative error number on failure
784  */
785 static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env,
786                                               struct lfsck_component *com,
787                                               struct dt_object *parent,
788                                               struct dt_object *child,
789                                               struct linkea_data *ldata,
790                                               struct lu_name *cname,
791                                               struct lu_fid *pfid)
792 {
793         struct lu_fid           *cfid   = &lfsck_env_info(env)->lti_fid3;
794         struct lustre_handle     lh     = { 0 };
795         int                      rc;
796         ENTRY;
797
798         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
799                               MDS_INODELOCK_UPDATE, LCK_EX);
800         if (rc != 0)
801                 RETURN(rc);
802
803         dt_read_lock(env, parent, 0);
804         if (unlikely(lfsck_is_dead_obj(parent))) {
805                 dt_read_unlock(env, parent);
806                 lfsck_ibits_unlock(&lh, LCK_EX);
807                 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
808                                                    cname, pfid, true);
809
810                 RETURN(rc);
811         }
812
813         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
814                        (const struct dt_key *)cname->ln_name,
815                        BYPASS_CAPA);
816         dt_read_unlock(env, parent);
817
818         /* It is safe to release the ldlm lock, because when the logic come
819          * here, we have got all the needed information above whether the
820          * linkEA entry is valid or not. It is not important that others
821          * may add new linkEA entry after the ldlm lock released. If other
822          * has removed the specified linkEA entry by race, then it is OK,
823          * because the subsequent lfsck_namespace_shrink_linkea() can handle
824          * such case. */
825         lfsck_ibits_unlock(&lh, LCK_EX);
826         if (rc == -ENOENT) {
827                 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
828                                                    cname, pfid, true);
829
830                 RETURN(rc);
831         }
832
833         if (rc != 0)
834                 RETURN(rc);
835
836         /* The LFSCK just found some internal status of cross-MDTs
837          * create operation. That is normal. */
838         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
839                 linkea_next_entry(ldata);
840
841                 RETURN(0);
842         }
843
844         rc = lfsck_namespace_shrink_linkea(env, com, child, ldata, cname,
845                                            pfid, true);
846
847         RETURN(rc);
848 }
849
850 /**
851  * Overwrite the linkEA for the object with the given ldata.
852  *
853  * The caller should take the ldlm lock before the calling.
854  *
855  * \param[in] env       pointer to the thread context
856  * \param[in] com       pointer to the lfsck component
857  * \param[in] obj       pointer to the dt_object to be handled
858  * \param[in] ldata     pointer to the new linkEA data
859  *
860  * \retval              positive number for repaired cases
861  * \retval              0 if nothing to be repaired
862  * \retval              negative error number on failure
863  */
864 int lfsck_namespace_rebuild_linkea(const struct lu_env *env,
865                                    struct lfsck_component *com,
866                                    struct dt_object *obj,
867                                    struct linkea_data *ldata)
868 {
869         struct lfsck_instance           *lfsck  = com->lc_lfsck;
870         struct dt_device                *dev    = lfsck->li_bottom;
871         struct thandle                  *th     = NULL;
872         struct lu_buf                    linkea_buf;
873         int                              rc     = 0;
874         ENTRY;
875
876         LASSERT(!dt_object_remote(obj));
877
878         th = dt_trans_create(env, dev);
879         if (IS_ERR(th))
880                 GOTO(log, rc = PTR_ERR(th));
881
882         lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
883                        ldata->ld_leh->leh_len);
884         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
885                                   XATTR_NAME_LINK, 0, th);
886         if (rc != 0)
887                 GOTO(stop, rc);
888
889         rc = dt_trans_start_local(env, dev, th);
890         if (rc != 0)
891                 GOTO(stop, rc);
892
893         dt_write_lock(env, obj, 0);
894         if (unlikely(lfsck_is_dead_obj(obj)))
895                 GOTO(unlock, rc = 0);
896
897         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
898                 GOTO(unlock, rc = 1);
899
900         rc = dt_xattr_set(env, obj, &linkea_buf,
901                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
902
903         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
904
905 unlock:
906         dt_write_unlock(env, obj);
907
908 stop:
909         dt_trans_stop(env, dev, th);
910
911 log:
912         CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild linkEA for the "
913                "object "DFID": rc = %d\n",
914                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
915
916         if (rc != 0) {
917                 struct lfsck_namespace *ns = com->lc_file_ram;
918
919                 ns->ln_flags |= LF_INCONSISTENT;
920         }
921
922         return rc;
923 }
924
925 /**
926  * Update the ".." name entry for the given object.
927  *
928  * The object's ".." is corrupted, this function will update the ".." name
929  * entry with the given pfid, and the linkEA with the given ldata.
930  *
931  * The caller should take the ldlm lock before the calling.
932  *
933  * \param[in] env       pointer to the thread context
934  * \param[in] com       pointer to the lfsck component
935  * \param[in] obj       pointer to the dt_object to be handled
936  * \param[in] pfid      the new fid for the object's ".." name entry
937  * \param[in] cname     the name for the @obj in the parent directory
938  *
939  * \retval              positive number for repaired cases
940  * \retval              0 if nothing to be repaired
941  * \retval              negative error number on failure
942  */
943 static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env,
944                                                   struct lfsck_component *com,
945                                                   struct dt_object *obj,
946                                                   const struct lu_fid *pfid,
947                                                   struct lu_name *cname)
948 {
949         struct lfsck_thread_info        *info   = lfsck_env_info(env);
950         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
951         struct lfsck_instance           *lfsck  = com->lc_lfsck;
952         struct dt_device                *dev    = lfsck->li_bottom;
953         struct thandle                  *th     = NULL;
954         struct linkea_data               ldata  = { 0 };
955         struct lu_buf                    linkea_buf;
956         int                              rc     = 0;
957         ENTRY;
958
959         LASSERT(!dt_object_remote(obj));
960         LASSERT(S_ISDIR(lfsck_object_type(obj)));
961
962         rc = linkea_data_new(&ldata, &info->lti_big_buf);
963         if (rc != 0)
964                 GOTO(log, rc);
965
966         rc = linkea_add_buf(&ldata, cname, pfid);
967         if (rc != 0)
968                 GOTO(log, rc);
969
970         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
971                        ldata.ld_leh->leh_len);
972
973         th = dt_trans_create(env, dev);
974         if (IS_ERR(th))
975                 GOTO(log, rc = PTR_ERR(th));
976
977         rc = dt_declare_delete(env, obj, (const struct dt_key *)dotdot, th);
978         if (rc != 0)
979                 GOTO(stop, rc);
980
981         rec->rec_type = S_IFDIR;
982         rec->rec_fid = pfid;
983         rc = dt_declare_insert(env, obj, (const struct dt_rec *)rec,
984                                (const struct dt_key *)dotdot, th);
985         if (rc != 0)
986                 GOTO(stop, rc);
987
988         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
989                                   XATTR_NAME_LINK, 0, th);
990         if (rc != 0)
991                 GOTO(stop, rc);
992
993         rc = dt_trans_start_local(env, dev, th);
994         if (rc != 0)
995                 GOTO(stop, rc);
996
997         dt_write_lock(env, obj, 0);
998         if (unlikely(lfsck_is_dead_obj(obj)))
999                 GOTO(unlock, rc = 0);
1000
1001         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1002                 GOTO(unlock, rc = 1);
1003
1004         /* The old ".." name entry maybe not exist. */
1005         dt_delete(env, obj, (const struct dt_key *)dotdot, th,
1006                   BYPASS_CAPA);
1007
1008         rc = dt_insert(env, obj, (const struct dt_rec *)rec,
1009                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
1010         if (rc != 0)
1011                 GOTO(unlock, rc);
1012
1013         rc = dt_xattr_set(env, obj, &linkea_buf,
1014                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1015
1016         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
1017
1018 unlock:
1019         dt_write_unlock(env, obj);
1020
1021 stop:
1022         dt_trans_stop(env, dev, th);
1023
1024 log:
1025         CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild dotdot name entry for "
1026                "the object "DFID", new parent "DFID": rc = %d\n",
1027                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)),
1028                PFID(pfid), rc);
1029
1030         if (rc != 0) {
1031                 struct lfsck_namespace *ns = com->lc_file_ram;
1032
1033                 ns->ln_flags |= LF_INCONSISTENT;
1034         }
1035
1036         return rc;
1037 }
1038
1039 /**
1040  * Handle orphan @obj during Double Scan Directory.
1041  *
1042  * Remove the @obj's current (invalid) linkEA entries, and insert
1043  * it in the directory .lustre/lost+found/MDTxxxx/ with the name:
1044  * ${FID}-${PFID}-D-${conflict_version}
1045  *
1046  * The caller should take the ldlm lock before the calling.
1047  *
1048  * \param[in] env       pointer to the thread context
1049  * \param[in] com       pointer to the lfsck component
1050  * \param[in] obj       pointer to the orphan object to be handled
1051  * \param[in] pfid      the new fid for the object's ".." name entry
1052  * \param[in,out] lh    ldlm lock handler for the given @obj
1053  *
1054  * \retval              positive number for repaired cases
1055  * \retval              0 if nothing to be repaired
1056  * \retval              negative error number on failure
1057  */
1058 static int lfsck_namespace_dsd_orphan(const struct lu_env *env,
1059                                       struct lfsck_component *com,
1060                                       struct dt_object *obj,
1061                                       const struct lu_fid *pfid,
1062                                       struct lustre_handle *lh)
1063 {
1064         struct lfsck_thread_info *info = lfsck_env_info(env);
1065         int                       rc;
1066         ENTRY;
1067
1068         /* Remove the unrecognized linkEA. */
1069         rc = lfsck_namespace_links_remove(env, com, obj);
1070         lfsck_ibits_unlock(lh, LCK_EX);
1071         if (rc < 0 && rc != -ENODATA)
1072                 RETURN(rc);
1073
1074         /* The unique linkEA is invalid, even if the ".." name entry may be
1075          * valid, we still cannot know via which name entry this directory
1076          * will be referenced. Then handle it as pure orphan. */
1077         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
1078                  "-"DFID, PFID(pfid));
1079         rc = lfsck_namespace_insert_orphan(env, com, obj,
1080                                            info->lti_tmpbuf, "D", NULL);
1081
1082         RETURN(rc);
1083 }
1084
1085 /**
1086  * Double Scan Directory object for single linkEA entry case.
1087  *
1088  * The given @child has unique linkEA entry. If the linkEA entry is valid,
1089  * then check whether the name is in the namespace or not, if not, add the
1090  * missing name entry back to namespace. If the linkEA entry is invalid,
1091  * then remove it and insert the @child in the .lustre/lost+found/MDTxxxx/
1092  * as an orphan.
1093  *
1094  * \param[in] env       pointer to the thread context
1095  * \param[in] com       pointer to the lfsck component
1096  * \param[in] child     pointer to the directory to be double scanned
1097  * \param[in] pfid      the FID corresponding to the ".." entry
1098  * \param[in] ldata     pointer to the linkEA data for the given @child
1099  * \param[in,out] lh    ldlm lock handler for the given @child
1100  * \param[out] type     to tell the caller what the inconsistency is
1101  * \param[in] retry     if found inconsistency, but the caller does not hold
1102  *                      ldlm lock on the @child, then set @retry as true
1103  *
1104  * \retval              positive number for repaired cases
1105  * \retval              0 if nothing to be repaired
1106  * \retval              negative error number on failure
1107  */
1108 static int
1109 lfsck_namespace_dsd_single(const struct lu_env *env,
1110                            struct lfsck_component *com,
1111                            struct dt_object *child,
1112                            const struct lu_fid *pfid,
1113                            struct linkea_data *ldata,
1114                            struct lustre_handle *lh,
1115                            enum lfsck_namespace_inconsistency_type *type,
1116                            bool *retry)
1117 {
1118         struct lfsck_thread_info *info          = lfsck_env_info(env);
1119         struct lu_name           *cname         = &info->lti_name;
1120         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
1121         struct lu_fid            *tfid          = &info->lti_fid3;
1122         struct lfsck_instance    *lfsck         = com->lc_lfsck;
1123         struct dt_object         *parent        = NULL;
1124         int                       rc            = 0;
1125         ENTRY;
1126
1127         lfsck_namespace_unpack_linkea_entry(ldata, cname, tfid, info->lti_key);
1128         /* The unique linkEA entry with bad parent will be handled as orphan. */
1129         if (!fid_is_sane(tfid)) {
1130                 if (!lustre_handle_is_used(lh) && retry != NULL)
1131                         *retry = true;
1132                 else
1133                         rc = lfsck_namespace_dsd_orphan(env, com, child,
1134                                                         pfid, lh);
1135
1136                 GOTO(out, rc);
1137         }
1138
1139         parent = lfsck_object_find_bottom(env, lfsck, tfid);
1140         if (IS_ERR(parent))
1141                 GOTO(out, rc = PTR_ERR(parent));
1142
1143         /* We trust the unique linkEA entry in spite of whether it matches the
1144          * ".." name entry or not. Because even if the linkEA entry is wrong
1145          * and the ".." name entry is right, we still cannot know via which
1146          * name entry the child will be referenced, since all known entries
1147          * have been verified during the first-stage scanning. */
1148         if (!dt_object_exists(parent)) {
1149                 if (!lustre_handle_is_used(lh) && retry != NULL) {
1150                         *retry = true;
1151
1152                         GOTO(out, rc = 0);
1153                 }
1154
1155                 lfsck_ibits_unlock(lh, LCK_EX);
1156                 /* Create the lost parent as an orphan. */
1157                 rc = lfsck_namespace_create_orphan(env, com, parent);
1158                 if (rc >= 0)
1159                         /* Add the missing name entry to the parent. */
1160                         rc = lfsck_namespace_insert_normal(env, com, parent,
1161                                                         child, cname->ln_name);
1162
1163                 GOTO(out, rc);
1164         }
1165
1166         /* The unique linkEA entry with bad parent will be handled as orphan. */
1167         if (unlikely(!dt_try_as_dir(env, parent))) {
1168                 if (!lustre_handle_is_used(lh) && retry != NULL)
1169                         *retry = true;
1170                 else
1171                         rc = lfsck_namespace_dsd_orphan(env, com, child,
1172                                                         pfid, lh);
1173
1174                 GOTO(out, rc);
1175         }
1176
1177         rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
1178                        (const struct dt_key *)cname->ln_name, BYPASS_CAPA);
1179         if (rc == -ENOENT) {
1180                 if (!lustre_handle_is_used(lh) && retry != NULL) {
1181                         *retry = true;
1182
1183                         GOTO(out, rc = 0);
1184                 }
1185
1186                 lfsck_ibits_unlock(lh, LCK_EX);
1187                 /* Add the missing name entry back to the namespace. */
1188                 rc = lfsck_namespace_insert_normal(env, com, parent, child,
1189                                                    cname->ln_name);
1190
1191                 GOTO(out, rc);
1192         }
1193
1194         if (rc != 0)
1195                 GOTO(out, rc);
1196
1197         /* XXX: The name entry references another MDT-object that may be
1198          *      created by the LFSCK for repairing dangling name entry.
1199          *      There will be another patch for further processing. */
1200         if (!lu_fid_eq(tfid, cfid)) {
1201                 if (!lustre_handle_is_used(lh) && retry != NULL)
1202                         *retry = true;
1203                 else
1204                         rc = lfsck_namespace_dsd_orphan(env, com, child,
1205                                                         pfid, lh);
1206
1207                 GOTO(out, rc);
1208         }
1209
1210         /* The ".." name entry is wrong, update it. */
1211         if (!lu_fid_eq(pfid, lfsck_dto2fid(parent))) {
1212                 if (!lustre_handle_is_used(lh) && retry != NULL) {
1213                         *retry = true;
1214
1215                         GOTO(out, rc = 0);
1216                 }
1217
1218                 *type = LNIT_UNMATCHED_PAIRS;
1219                 rc = lfsck_namespace_repair_unmatched_pairs(env, com, child,
1220                                                 lfsck_dto2fid(parent), cname);
1221         }
1222
1223         GOTO(out, rc);
1224
1225 out:
1226         if (parent != NULL && !IS_ERR(parent))
1227                 lfsck_object_put(env, parent);
1228
1229         return rc;
1230 }
1231
1232 /**
1233  * Double Scan Directory object for single linkEA entry case.
1234  *
1235  * The given @child has multiple linkEA entries. There is at most one linkEA
1236  * entry will be valid, all the others will be removed. Firstly, the function
1237  * will try to find out the linkEA entry for which the name entry exists under
1238  * the given parent (@pfid). If there is no linkEA entry that matches the given
1239  * ".." name entry, then tries to find out the first linkEA entry that both the
1240  * parent and the name entry exist to rebuild a new ".." name entry.
1241  *
1242  * \param[in] env       pointer to the thread context
1243  * \param[in] com       pointer to the lfsck component
1244  * \param[in] child     pointer to the directory to be double scanned
1245  * \param[in] pfid      the FID corresponding to the ".." entry
1246  * \param[in] ldata     pointer to the linkEA data for the given @child
1247  * \param[in,out] lh    ldlm lock handler for the given @child
1248  * \param[out] type     to tell the caller what the inconsistency is
1249  * \param[in] lpf       true if the ".." entry is under lost+found/MDTxxxx/
1250  *
1251  * \retval              positive number for repaired cases
1252  * \retval              0 if nothing to be repaired
1253  * \retval              negative error number on failure
1254  */
1255 static int
1256 lfsck_namespace_dsd_multiple(const struct lu_env *env,
1257                              struct lfsck_component *com,
1258                              struct dt_object *child,
1259                              const struct lu_fid *pfid,
1260                              struct linkea_data *ldata,
1261                              struct lustre_handle *lh,
1262                              enum lfsck_namespace_inconsistency_type *type,
1263                              bool lpf)
1264 {
1265         struct lfsck_thread_info *info          = lfsck_env_info(env);
1266         struct lu_name           *cname         = &info->lti_name;
1267         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
1268         struct lu_fid            *tfid          = &info->lti_fid3;
1269         struct lu_fid            *pfid2         = &info->lti_fid4;
1270         struct lfsck_instance    *lfsck         = com->lc_lfsck;
1271         struct dt_object         *parent        = NULL;
1272         struct linkea_data        ldata_new     = { 0 };
1273         int                       rc            = 0;
1274         bool                      once          = true;
1275         ENTRY;
1276
1277 again:
1278         while (ldata->ld_lee != NULL) {
1279                 lfsck_namespace_unpack_linkea_entry(ldata, cname, tfid,
1280                                                     info->lti_key);
1281                 /* Drop repeated linkEA entries. */
1282                 lfsck_namespace_filter_linkea_entry(ldata, cname, tfid, true);
1283                 /* Drop invalid linkEA entry. */
1284                 if (!fid_is_sane(tfid)) {
1285                         linkea_del_buf(ldata, cname);
1286                         continue;
1287                 }
1288
1289                 /* If current dotdot is the .lustre/lost+found/MDTxxxx/,
1290                  * then it is possible that: the directry object has ever
1291                  * been lost, but its name entry was there. In the former
1292                  * LFSCK run, during the first-stage scanning, the LFSCK
1293                  * found the dangling name entry, but it did not recreate
1294                  * the lost object, and when moved to the second-stage
1295                  * scanning, some children objects of the lost directory
1296                  * object were found, then the LFSCK recreated such lost
1297                  * directory object as an orphan.
1298                  *
1299                  * When the LFSCK runs again, if the dangling name is still
1300                  * there, the LFSCK should move the orphan directory object
1301                  * back to the normal namespace. */
1302                 if (!lpf && !lu_fid_eq(pfid, tfid) && once) {
1303                         linkea_next_entry(ldata);
1304                         continue;
1305                 }
1306
1307                 parent = lfsck_object_find_bottom(env, lfsck, tfid);
1308                 if (IS_ERR(parent))
1309                         RETURN(PTR_ERR(parent));
1310
1311                 if (!dt_object_exists(parent)) {
1312                         lfsck_object_put(env, parent);
1313                         if (ldata->ld_leh->leh_reccount > 1) {
1314                                 /* If it is NOT the last linkEA entry, then
1315                                  * there is still other chance to make the
1316                                  * child to be visible via other parent, then
1317                                  * remove this linkEA entry. */
1318                                 linkea_del_buf(ldata, cname);
1319                                 continue;
1320                         }
1321
1322                         break;
1323                 }
1324
1325                 /* The linkEA entry with bad parent will be removed. */
1326                 if (unlikely(!dt_try_as_dir(env, parent))) {
1327                         lfsck_object_put(env, parent);
1328                         linkea_del_buf(ldata, cname);
1329                         continue;
1330                 }
1331
1332                 rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
1333                                (const struct dt_key *)cname->ln_name,
1334                                BYPASS_CAPA);
1335                 *pfid2 = *lfsck_dto2fid(parent);
1336                 lfsck_object_put(env, parent);
1337                 if (rc == -ENOENT) {
1338                         linkea_next_entry(ldata);
1339                         continue;
1340                 }
1341
1342                 if (rc != 0)
1343                         RETURN(rc);
1344
1345                 if (lu_fid_eq(tfid, cfid)) {
1346                         if (!lu_fid_eq(pfid, pfid2)) {
1347                                 *type = LNIT_UNMATCHED_PAIRS;
1348                                 rc = lfsck_namespace_repair_unmatched_pairs(env,
1349                                                 com, child, pfid2, cname);
1350
1351                                 RETURN(rc);
1352                         }
1353
1354                         /* It is the most common case that we find the
1355                          * name entry corresponding to the linkEA entry
1356                          * that matches the ".." name entry. */
1357                         rc = linkea_data_new(&ldata_new, &info->lti_big_buf);
1358                         if (rc != 0)
1359                                 RETURN(rc);
1360
1361                         rc = linkea_add_buf(&ldata_new, cname, pfid2);
1362                         if (rc != 0)
1363                                 RETURN(rc);
1364
1365                         rc = lfsck_namespace_rebuild_linkea(env, com, child,
1366                                                             &ldata_new);
1367
1368                         /* XXX: there will be other patch. */
1369
1370                         RETURN(rc);
1371                 }
1372
1373                 /* XXX: The name entry references another MDT-object that
1374                  *      may be created by the LFSCK for repairing dangling
1375                  *      name entry. There will be another patch for further
1376                  *      processing. */
1377                 linkea_del_buf(ldata, cname);
1378         }
1379
1380         if (ldata->ld_leh->leh_reccount == 1) {
1381                 rc = lfsck_namespace_dsd_single(env, com, child, pfid, ldata,
1382                                                 lh, type, NULL);
1383
1384                 RETURN(rc);
1385         }
1386
1387         /* All linkEA entries are invalid and removed, then handle the @child
1388          * as an orphan.*/
1389         if (ldata->ld_leh->leh_reccount == 0) {
1390                 rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, lh);
1391
1392                 RETURN(rc);
1393         }
1394
1395         linkea_first_entry(ldata);
1396         /* If the dangling name entry for the orphan directory object has
1397          * been remvoed, then just check whether the directory object is
1398          * still under the .lustre/lost+found/MDTxxxx/ or not. */
1399         if (lpf) {
1400                 lpf = false;
1401                 goto again;
1402         }
1403
1404         /* There is no linkEA entry that matches the ".." name entry. Find
1405          * the first linkEA entry that both parent and name entry exist to
1406          * rebuild a new ".." name entry. */
1407         if (once) {
1408                 once = false;
1409                 goto again;
1410         }
1411
1412         RETURN(rc);
1413 }
1414
1415 /**
1416  * Double scan the directory object for namespace LFSCK.
1417  *
1418  * This function will verify the <parent, child> pairs in the namespace tree:
1419  * the parent references the child via some name entry that should be in the
1420  * child's linkEA entry, the child should back references the parent via its
1421  * ".." name entry.
1422  *
1423  * The LFSCK will scan every linkEA entry in turn until find out the first
1424  * matched pairs. If found, then all other linkEA entries will be dropped.
1425  * If all the linkEA entries cannot match the ".." name entry, then there
1426  * are serveral possible cases:
1427  *
1428  * 1) If there is only one linkEA entry, then trust it as long as the PFID
1429  *    in the linkEA entry is valid.
1430  *
1431  * 2) If there are multiple linkEA entries, then try to find the linkEA
1432  *    that matches the ".." name entry. If found, then all other entries
1433  *    are invalid; otherwise, it is quite possible that the ".." name entry
1434  *    is corrupted. Under such case, the LFSCK will rebuild the ".." name
1435  *    entry according to the first valid linkEA entry (both the parent and
1436  *    the name entry should exist).
1437  *
1438  * 3) If the directory object has no (valid) linkEA entry, then the
1439  *    directory object will be handled as pure orphan and inserted
1440  *    in the .lustre/lost+found/MDTxxxx/ with the name:
1441  *    ${self_FID}-${PFID}-D-${conflict_version}
1442  *
1443  * \param[in] env       pointer to the thread context
1444  * \param[in] com       pointer to the lfsck component
1445  * \param[in] child     pointer to the directory object to be handled
1446  * \param[in] flags     to indicate the specical checking on the @child
1447  *
1448  * \retval              positive number for repaired cases
1449  * \retval              0 if nothing to be repaired
1450  * \retval              negative error number on failure
1451  */
1452 static int lfsck_namespace_double_scan_dir(const struct lu_env *env,
1453                                            struct lfsck_component *com,
1454                                            struct dt_object *child, __u8 flags)
1455 {
1456         struct lfsck_thread_info *info          = lfsck_env_info(env);
1457         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
1458         struct lu_fid            *pfid          = &info->lti_fid2;
1459         struct lfsck_namespace   *ns            = com->lc_file_ram;
1460         struct lfsck_instance    *lfsck         = com->lc_lfsck;
1461         struct lustre_handle      lh            = { 0 };
1462         struct linkea_data        ldata         = { 0 };
1463         bool                      unknown       = false;
1464         bool                      lpf           = false;
1465         bool                      retry         = false;
1466         enum lfsck_namespace_inconsistency_type type = LNIT_BAD_LINKEA;
1467         int                       rc            = 0;
1468         ENTRY;
1469
1470         LASSERT(!dt_object_remote(child));
1471
1472         if (!(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) {
1473                 CDEBUG(D_LFSCK, "%s: some MDT(s) maybe NOT take part in the"
1474                        "the namespace LFSCK, then the LFSCK cannot guarantee"
1475                        "all the name entries have been verified in first-stage"
1476                        "scanning. So have to skip orphan related handling for"
1477                        "the directory object "DFID" with remote name entry\n",
1478                        lfsck_lfsck2name(lfsck), PFID(cfid));
1479
1480                 RETURN(0);
1481         }
1482
1483         if (unlikely(!dt_try_as_dir(env, child)))
1484                 GOTO(out, rc = -ENOTDIR);
1485
1486         /* We only take ldlm lock on the @child when required. When the
1487          * logic comes here for the first time, it is always false. */
1488         if (0) {
1489
1490 lock:
1491                 rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1492                                       MDS_INODELOCK_UPDATE |
1493                                       MDS_INODELOCK_XATTR, LCK_EX);
1494                 if (rc != 0)
1495                         GOTO(out, rc);
1496         }
1497
1498         dt_read_lock(env, child, 0);
1499         if (unlikely(lfsck_is_dead_obj(child))) {
1500                 dt_read_unlock(env, child);
1501
1502                 GOTO(out, rc = 0);
1503         }
1504
1505         rc = dt_lookup(env, child, (struct dt_rec *)pfid,
1506                        (const struct dt_key *)dotdot, BYPASS_CAPA);
1507         if (rc != 0) {
1508                 if (rc != -ENOENT && rc != -ENODATA && rc != -EINVAL) {
1509                         dt_read_unlock(env, child);
1510
1511                         GOTO(out, rc);
1512                 }
1513
1514                 if (!lustre_handle_is_used(&lh)) {
1515                         dt_read_unlock(env, child);
1516                         goto lock;
1517                 }
1518
1519                 fid_zero(pfid);
1520         } else if (lfsck->li_lpf_obj != NULL &&
1521                    lu_fid_eq(pfid, lfsck_dto2fid(lfsck->li_lpf_obj))) {
1522                 lpf = true;
1523         }
1524
1525         rc = lfsck_links_read(env, child, &ldata);
1526         dt_read_unlock(env, child);
1527         if (rc != 0) {
1528                 if (rc != -ENODATA && rc != -EINVAL)
1529                         GOTO(out, rc);
1530
1531                 if (!lustre_handle_is_used(&lh))
1532                         goto lock;
1533
1534                 if (rc == -EINVAL && !fid_is_zero(pfid)) {
1535                         /* Remove the corrupted linkEA. */
1536                         rc = lfsck_namespace_links_remove(env, com, child);
1537                         if (rc == 0)
1538                                 /* Here, because of the crashed linkEA, we
1539                                  * cannot know whether there is some parent
1540                                  * that references the child directory via
1541                                  * some name entry or not. So keep it there,
1542                                  * when the LFSCK run next time, if there is
1543                                  * some parent that references this object,
1544                                  * then the LFSCK can rebuild the linkEA;
1545                                  * otherwise, this object will be handled
1546                                  * as orphan as above. */
1547                                 unknown = true;
1548                 } else {
1549                         /* 1. If we have neither ".." nor linkEA,
1550                          *    then it is an orphan.
1551                          *
1552                          * 2. If we only have the ".." name entry,
1553                          *    but no parent references this child
1554                          *    directory, then handle it as orphan. */
1555                         lfsck_ibits_unlock(&lh, LCK_EX);
1556                         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
1557                                  "-"DFID, PFID(pfid));
1558                         rc = lfsck_namespace_insert_orphan(env, com, child,
1559                                                 info->lti_tmpbuf, "D", NULL);
1560                 }
1561
1562                 GOTO(out, rc);
1563         }
1564
1565         linkea_first_entry(&ldata);
1566         /* This is the most common case: the object has unique linkEA entry. */
1567         if (ldata.ld_leh->leh_reccount == 1) {
1568                 rc = lfsck_namespace_dsd_single(env, com, child, pfid, &ldata,
1569                                                 &lh, &type, &retry);
1570                 if (retry) {
1571                         LASSERT(!lustre_handle_is_used(&lh));
1572
1573                         retry = false;
1574                         goto lock;
1575                 }
1576
1577                 GOTO(out, rc);
1578         }
1579
1580         if (!lustre_handle_is_used(&lh))
1581                 goto lock;
1582
1583         if (unlikely(ldata.ld_leh->leh_reccount == 0)) {
1584                 rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, &lh);
1585
1586                 GOTO(out, rc);
1587         }
1588
1589         /* When we come here, the cases usually like that:
1590          * 1) The directory object has a corrupted linkEA entry. During the
1591          *    first-stage scanning, the LFSCK cannot know such corruption,
1592          *    then it appends the right linkEA entry according to the found
1593          *    name entry after the bad one.
1594          *
1595          * 2) The directory object has a right linkEA entry. During the
1596          *    first-stage scanning, the LFSCK finds some bad name entry,
1597          *    but the LFSCK cannot aware that at that time, then it adds
1598          *    the bad linkEA entry for further processing. */
1599         rc = lfsck_namespace_dsd_multiple(env, com, child, pfid, &ldata,
1600                                           &lh, &type, lpf);
1601
1602         GOTO(out, rc);
1603
1604 out:
1605         lfsck_ibits_unlock(&lh, LCK_EX);
1606         if (rc > 0) {
1607                 switch (type) {
1608                 case LNIT_BAD_LINKEA:
1609                         ns->ln_linkea_repaired++;
1610                         break;
1611                 case LNIT_UNMATCHED_PAIRS:
1612                         ns->ln_unmatched_pairs_repaired++;
1613                         break;
1614                 default:
1615                         break;
1616                 }
1617         }
1618
1619         if (unknown)
1620                 ns->ln_unknown_inconsistency++;
1621
1622         return rc;
1623 }
1624
1625 /**
1626  * Double scan the MDT-object for namespace LFSCK.
1627  *
1628  * If the MDT-object contains invalid or repeated linkEA entries, then drop
1629  * those entries from the linkEA; if the linkEA becomes empty or the object
1630  * has no linkEA, then it is an orphan and will be added into the directory
1631  * .lustre/lost+found/MDTxxxx/; if the remote parent is lost, then recreate
1632  * the remote parent; if the name entry corresponding to some linkEA entry
1633  * is lost, then add the name entry back to the namespace.
1634  *
1635  * \param[in] env       pointer to the thread context
1636  * \param[in] com       pointer to the lfsck component
1637  * \param[in] child     pointer to the dt_object to be handled
1638  * \param[in] flags     some hints to indicate how the @child should be handled
1639  *
1640  * \retval              positive number for repaired cases
1641  * \retval              0 if nothing to be repaired
1642  * \retval              negative error number on failure
1643  */
1644 static int lfsck_namespace_double_scan_one(const struct lu_env *env,
1645                                            struct lfsck_component *com,
1646                                            struct dt_object *child, __u8 flags)
1647 {
1648         struct lfsck_thread_info *info     = lfsck_env_info(env);
1649         struct lu_attr           *la       = &info->lti_la;
1650         struct lu_name           *cname    = &info->lti_name;
1651         struct lu_fid            *pfid     = &info->lti_fid;
1652         struct lu_fid            *cfid     = &info->lti_fid2;
1653         struct lfsck_instance    *lfsck    = com->lc_lfsck;
1654         struct lfsck_namespace   *ns       = com->lc_file_ram;
1655         struct dt_object         *parent   = NULL;
1656         struct linkea_data        ldata    = { 0 };
1657         bool                      repaired = false;
1658         int                       count    = 0;
1659         int                       rc;
1660         ENTRY;
1661
1662         dt_read_lock(env, child, 0);
1663         if (unlikely(lfsck_is_dead_obj(child))) {
1664                 dt_read_unlock(env, child);
1665
1666                 RETURN(0);
1667         }
1668
1669         if (S_ISDIR(lfsck_object_type(child))) {
1670                 dt_read_unlock(env, child);
1671                 rc = lfsck_namespace_double_scan_dir(env, com, child, flags);
1672
1673                 RETURN(rc);
1674         }
1675
1676         rc = lfsck_links_read(env, child, &ldata);
1677         dt_read_unlock(env, child);
1678         if (rc != 0)
1679                 GOTO(out, rc);
1680
1681         linkea_first_entry(&ldata);
1682         while (ldata.ld_lee != NULL) {
1683                 lfsck_namespace_unpack_linkea_entry(&ldata, cname, pfid,
1684                                                     info->lti_key);
1685                 rc = lfsck_namespace_filter_linkea_entry(&ldata, cname, pfid,
1686                                                          false);
1687                 /* Found repeated linkEA entries */
1688                 if (rc > 0) {
1689                         rc = lfsck_namespace_shrink_linkea(env, com, child,
1690                                                 &ldata, cname, pfid, false);
1691                         if (rc < 0)
1692                                 GOTO(out, rc);
1693
1694                         if (rc == 0)
1695                                 continue;
1696
1697                         repaired = true;
1698
1699                         /* fall through */
1700                 }
1701
1702                 /* Invalid PFID in the linkEA entry. */
1703                 if (!fid_is_sane(pfid)) {
1704                         rc = lfsck_namespace_shrink_linkea(env, com, child,
1705                                                 &ldata, cname, pfid, true);
1706                         if (rc < 0)
1707                                 GOTO(out, rc);
1708
1709                         if (rc > 0)
1710                                 repaired = true;
1711
1712                         continue;
1713                 }
1714
1715                 parent = lfsck_object_find_bottom(env, lfsck, pfid);
1716                 if (IS_ERR(parent))
1717                         GOTO(out, rc = PTR_ERR(parent));
1718
1719                 if (!dt_object_exists(parent)) {
1720                         if (ldata.ld_leh->leh_reccount > 1) {
1721                                 /* If it is NOT the last linkEA entry, then
1722                                  * there is still other chance to make the
1723                                  * child to be visible via other parent, then
1724                                  * remove this linkEA entry. */
1725                                 rc = lfsck_namespace_shrink_linkea(env, com,
1726                                         child, &ldata, cname, pfid, true);
1727                         } else {
1728                                 /* Create the lost parent as an orphan. */
1729                                 rc = lfsck_namespace_create_orphan(env, com,
1730                                                                    parent);
1731                                 if (rc < 0) {
1732                                         lfsck_object_put(env, parent);
1733
1734                                         GOTO(out, rc);
1735                                 }
1736
1737                                 if (rc > 0)
1738                                         repaired = true;
1739
1740                                 /* Add the missing name entry to the parent. */
1741                                 rc = lfsck_namespace_insert_normal(env, com,
1742                                                 parent, child, cname->ln_name);
1743                                 linkea_next_entry(&ldata);
1744                         }
1745
1746                         lfsck_object_put(env, parent);
1747                         if (rc < 0)
1748                                 GOTO(out, rc);
1749
1750                         if (rc > 0)
1751                                 repaired = true;
1752
1753                         continue;
1754                 }
1755
1756                 /* The linkEA entry with bad parent will be removed. */
1757                 if (unlikely(!dt_try_as_dir(env, parent))) {
1758                         lfsck_object_put(env, parent);
1759                         rc = lfsck_namespace_shrink_linkea(env, com, child,
1760                                                 &ldata, cname, pfid, true);
1761                         if (rc < 0)
1762                                 GOTO(out, rc);
1763
1764                         if (rc > 0)
1765                                 repaired = true;
1766
1767                         continue;
1768                 }
1769
1770                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1771                                (const struct dt_key *)cname->ln_name,
1772                                BYPASS_CAPA);
1773                 if (rc != 0 && rc != -ENOENT) {
1774                         lfsck_object_put(env, parent);
1775
1776                         GOTO(out, rc);
1777                 }
1778
1779                 if (rc == 0) {
1780                         lfsck_object_put(env, parent);
1781                         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
1782                                 /* It is the most common case that we
1783                                  * find the name entry corresponding
1784                                  * to the linkEA entry. */
1785                                 linkea_next_entry(&ldata);
1786                         } else {
1787                                 /* XXX: The name entry references another
1788                                  *      MDT-object that may be created by
1789                                  *      the LFSCK for repairing dangling
1790                                  *      name entry. There will be another
1791                                  *      patch for further processing. */
1792                                 rc = lfsck_namespace_shrink_linkea(env, com,
1793                                         child, &ldata, cname, pfid, true);
1794                                 if (rc < 0)
1795                                         GOTO(out, rc);
1796
1797                                 if (rc > 0)
1798                                         repaired = true;
1799                         }
1800
1801                         continue;
1802                 }
1803
1804                 rc = dt_attr_get(env, child, la, BYPASS_CAPA);
1805                 if (rc != 0)
1806                         GOTO(out, rc);
1807
1808                 /* If there is no name entry in the parent dir and the object
1809                  * link count is less than the linkea entries count, then the
1810                  * linkea entry should be removed. */
1811                 if (ldata.ld_leh->leh_reccount > la->la_nlink) {
1812                         rc = lfsck_namespace_shrink_linkea_cond(env, com,
1813                                         parent, child, &ldata, cname, pfid);
1814                         lfsck_object_put(env, parent);
1815                         if (rc < 0)
1816                                 GOTO(out, rc);
1817
1818                         if (rc > 0)
1819                                 repaired = true;
1820
1821                         continue;
1822                 }
1823
1824                 /* Add the missing name entry back to the namespace. */
1825                 rc = lfsck_namespace_insert_normal(env, com, parent, child,
1826                                                    cname->ln_name);
1827                 lfsck_object_put(env, parent);
1828                 if (rc < 0)
1829                         GOTO(out, rc);
1830
1831                 if (rc > 0)
1832                         repaired = true;
1833
1834                 linkea_next_entry(&ldata);
1835         }
1836
1837         GOTO(out, rc = 0);
1838
1839 out:
1840         if (rc < 0 && rc != -ENODATA)
1841                 return rc;
1842
1843         if (rc == 0) {
1844                 LASSERT(ldata.ld_leh != NULL);
1845
1846                 count = ldata.ld_leh->leh_reccount;
1847         }
1848
1849         if (count == 0) {
1850                 /* If the child becomes orphan, then insert it into
1851                  * the global .lustre/lost+found/MDTxxxx directory. */
1852                 rc = lfsck_namespace_insert_orphan(env, com, child, "", "O",
1853                                                    &count);
1854                 if (rc < 0)
1855                         return rc;
1856
1857                 if (rc > 0)
1858                         repaired = true;
1859         }
1860
1861         rc = dt_attr_get(env, child, la, BYPASS_CAPA);
1862         if (rc != 0)
1863                 return rc;
1864
1865         if (la->la_nlink != count) {
1866                 /* XXX: there will be other patch(es) for MDT-object
1867                  *      hard links verification. */
1868         }
1869
1870         if (repaired) {
1871                 if (la->la_nlink > 1)
1872                         ns->ln_mul_linked_repaired++;
1873
1874                 if (rc == 0)
1875                         rc = 1;
1876         }
1877
1878         return rc;
1879 }
1880
1881 static void lfsck_namespace_dump_statistics(struct seq_file *m,
1882                                             struct lfsck_namespace *ns,
1883                                             __u64 checked_phase1,
1884                                             __u64 checked_phase2,
1885                                             __u32 time_phase1,
1886                                             __u32 time_phase2)
1887 {
1888         seq_printf(m, "checked_phase1: "LPU64"\n"
1889                       "checked_phase2: "LPU64"\n"
1890                       "updated_phase1: "LPU64"\n"
1891                       "updated_phase2: "LPU64"\n"
1892                       "failed_phase1: "LPU64"\n"
1893                       "failed_phase2: "LPU64"\n"
1894                       "directories: "LPU64"\n"
1895                       "dirent_repaired: "LPU64"\n"
1896                       "linkea_repaired: "LPU64"\n"
1897                       "nlinks_repaired: "LPU64"\n"
1898                       "lost_found: "LPU64"\n"
1899                       "multiple_linked_checked: "LPU64"\n"
1900                       "multiple_linked_repaired: "LPU64"\n"
1901                       "unknown_inconsistency: "LPU64"\n"
1902                       "unmatched_pairs_repaired: "LPU64"\n"
1903                       "success_count: %u\n"
1904                       "run_time_phase1: %u seconds\n"
1905                       "run_time_phase2: %u seconds\n",
1906                       checked_phase1,
1907                       checked_phase2,
1908                       ns->ln_items_repaired,
1909                       ns->ln_objs_repaired_phase2,
1910                       ns->ln_items_failed,
1911                       ns->ln_objs_failed_phase2,
1912                       ns->ln_dirs_checked,
1913                       ns->ln_dirent_repaired,
1914                       ns->ln_linkea_repaired,
1915                       ns->ln_objs_nlink_repaired,
1916                       ns->ln_objs_lost_found,
1917                       ns->ln_mul_linked_checked,
1918                       ns->ln_mul_linked_repaired,
1919                       ns->ln_unknown_inconsistency,
1920                       ns->ln_unmatched_pairs_repaired,
1921                       ns->ln_success_count,
1922                       time_phase1,
1923                       time_phase2);
1924 }
1925
1926 /* namespace APIs */
1927
1928 static int lfsck_namespace_reset(const struct lu_env *env,
1929                                  struct lfsck_component *com, bool init)
1930 {
1931         struct lfsck_instance   *lfsck = com->lc_lfsck;
1932         struct lfsck_namespace  *ns    = com->lc_file_ram;
1933         struct dt_object        *root;
1934         struct dt_object        *dto;
1935         int                      rc;
1936         ENTRY;
1937
1938         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
1939         if (IS_ERR(root))
1940                 GOTO(log, rc = PTR_ERR(root));
1941
1942         if (unlikely(!dt_try_as_dir(env, root)))
1943                 GOTO(put, rc = -ENOTDIR);
1944
1945         down_write(&com->lc_sem);
1946         if (init) {
1947                 memset(ns, 0, sizeof(*ns));
1948         } else {
1949                 __u32 count = ns->ln_success_count;
1950                 __u64 last_time = ns->ln_time_last_complete;
1951
1952                 memset(ns, 0, sizeof(*ns));
1953                 ns->ln_success_count = count;
1954                 ns->ln_time_last_complete = last_time;
1955         }
1956         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
1957         ns->ln_status = LS_INIT;
1958
1959         rc = local_object_unlink(env, lfsck->li_bottom, root,
1960                                  lfsck_namespace_name);
1961         if (rc != 0)
1962                 GOTO(out, rc);
1963
1964         lfsck_object_put(env, com->lc_obj);
1965         com->lc_obj = NULL;
1966         dto = local_index_find_or_create(env, lfsck->li_los, root,
1967                                          lfsck_namespace_name,
1968                                          S_IFREG | S_IRUGO | S_IWUSR,
1969                                          &dt_lfsck_features);
1970         if (IS_ERR(dto))
1971                 GOTO(out, rc = PTR_ERR(dto));
1972
1973         com->lc_obj = dto;
1974         rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
1975         if (rc != 0)
1976                 GOTO(out, rc);
1977
1978         rc = lfsck_namespace_store(env, com, true);
1979
1980         GOTO(out, rc);
1981
1982 out:
1983         up_write(&com->lc_sem);
1984
1985 put:
1986         lu_object_put(env, &root->do_lu);
1987 log:
1988         CDEBUG(D_LFSCK, "%s: namespace LFSCK reset: rc = %d\n",
1989                lfsck_lfsck2name(lfsck), rc);
1990         return rc;
1991 }
1992
1993 static void
1994 lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
1995                      bool new_checked)
1996 {
1997         struct lfsck_namespace *ns = com->lc_file_ram;
1998
1999         down_write(&com->lc_sem);
2000         if (new_checked)
2001                 com->lc_new_checked++;
2002         lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
2003         up_write(&com->lc_sem);
2004 }
2005
2006 static int lfsck_namespace_checkpoint(const struct lu_env *env,
2007                                       struct lfsck_component *com, bool init)
2008 {
2009         struct lfsck_instance   *lfsck = com->lc_lfsck;
2010         struct lfsck_namespace  *ns    = com->lc_file_ram;
2011         int                      rc;
2012
2013         if (!init) {
2014                 rc = lfsck_checkpoint_generic(env, com);
2015                 if (rc != 0)
2016                         goto log;
2017         }
2018
2019         down_write(&com->lc_sem);
2020         if (init) {
2021                 ns->ln_pos_latest_start = lfsck->li_pos_checkpoint;
2022         } else {
2023                 ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
2024                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
2025                                 HALF_SEC - lfsck->li_time_last_checkpoint);
2026                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
2027                 ns->ln_items_checked += com->lc_new_checked;
2028                 com->lc_new_checked = 0;
2029         }
2030
2031         rc = lfsck_namespace_store(env, com, false);
2032         up_write(&com->lc_sem);
2033
2034 log:
2035         CDEBUG(D_LFSCK, "%s: namespace LFSCK checkpoint at the pos ["LPU64
2036                ", "DFID", "LPX64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
2037                lfsck->li_pos_current.lp_oit_cookie,
2038                PFID(&lfsck->li_pos_current.lp_dir_parent),
2039                lfsck->li_pos_current.lp_dir_cookie, rc);
2040
2041         return rc > 0 ? 0 : rc;
2042 }
2043
2044 static int lfsck_namespace_prep(const struct lu_env *env,
2045                                 struct lfsck_component *com,
2046                                 struct lfsck_start_param *lsp)
2047 {
2048         struct lfsck_instance   *lfsck  = com->lc_lfsck;
2049         struct lfsck_namespace  *ns     = com->lc_file_ram;
2050         struct lfsck_position   *pos    = &com->lc_pos_start;
2051         int                      rc;
2052
2053         if (ns->ln_status == LS_COMPLETED) {
2054                 rc = lfsck_namespace_reset(env, com, false);
2055                 if (rc == 0)
2056                         rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
2057
2058                 if (rc != 0) {
2059                         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: "
2060                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
2061
2062                         return rc;
2063                 }
2064         }
2065
2066         down_write(&com->lc_sem);
2067         ns->ln_time_latest_start = cfs_time_current_sec();
2068         spin_lock(&lfsck->li_lock);
2069
2070         if (ns->ln_flags & LF_SCANNED_ONCE) {
2071                 if (!lfsck->li_drop_dryrun ||
2072                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
2073                         ns->ln_status = LS_SCANNING_PHASE2;
2074                         list_move_tail(&com->lc_link,
2075                                        &lfsck->li_list_double_scan);
2076                         if (!list_empty(&com->lc_link_dir))
2077                                 list_del_init(&com->lc_link_dir);
2078                         lfsck_pos_set_zero(pos);
2079                 } else {
2080                         ns->ln_status = LS_SCANNING_PHASE1;
2081                         ns->ln_run_time_phase1 = 0;
2082                         ns->ln_run_time_phase2 = 0;
2083                         ns->ln_items_checked = 0;
2084                         ns->ln_items_repaired = 0;
2085                         ns->ln_items_failed = 0;
2086                         ns->ln_dirs_checked = 0;
2087                         ns->ln_objs_checked_phase2 = 0;
2088                         ns->ln_objs_repaired_phase2 = 0;
2089                         ns->ln_objs_failed_phase2 = 0;
2090                         ns->ln_objs_nlink_repaired = 0;
2091                         ns->ln_objs_lost_found = 0;
2092                         ns->ln_dirent_repaired = 0;
2093                         ns->ln_linkea_repaired = 0;
2094                         ns->ln_mul_linked_checked = 0;
2095                         ns->ln_mul_linked_repaired = 0;
2096                         ns->ln_unknown_inconsistency = 0;
2097                         ns->ln_unmatched_pairs_repaired = 0;
2098                         fid_zero(&ns->ln_fid_latest_scanned_phase2);
2099                         if (list_empty(&com->lc_link_dir))
2100                                 list_add_tail(&com->lc_link_dir,
2101                                               &lfsck->li_list_dir);
2102                         *pos = ns->ln_pos_first_inconsistent;
2103                 }
2104         } else {
2105                 ns->ln_status = LS_SCANNING_PHASE1;
2106                 if (list_empty(&com->lc_link_dir))
2107                         list_add_tail(&com->lc_link_dir,
2108                                       &lfsck->li_list_dir);
2109                 if (!lfsck->li_drop_dryrun ||
2110                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
2111                         *pos = ns->ln_pos_last_checkpoint;
2112                         pos->lp_oit_cookie++;
2113                 } else {
2114                         *pos = ns->ln_pos_first_inconsistent;
2115                 }
2116         }
2117
2118         spin_unlock(&lfsck->li_lock);
2119         up_write(&com->lc_sem);
2120
2121         rc = lfsck_start_assistant(env, com, lsp);
2122
2123         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep done, start pos ["LPU64", "
2124                DFID", "LPX64"]: rc = %d\n",
2125                lfsck_lfsck2name(lfsck), pos->lp_oit_cookie,
2126                PFID(&pos->lp_dir_parent), pos->lp_dir_cookie, rc);
2127
2128         return rc;
2129 }
2130
2131 static int lfsck_namespace_exec_oit(const struct lu_env *env,
2132                                     struct lfsck_component *com,
2133                                     struct dt_object *obj)
2134 {
2135         struct lfsck_thread_info *info  = lfsck_env_info(env);
2136         struct lfsck_namespace   *ns    = com->lc_file_ram;
2137         struct lfsck_instance    *lfsck = com->lc_lfsck;
2138         const struct lu_fid      *fid   = lfsck_dto2fid(obj);
2139         struct lu_attr           *la    = &info->lti_la;
2140         struct lu_fid            *pfid  = &info->lti_fid2;
2141         struct lu_name           *cname = &info->lti_name;
2142         struct lu_seq_range      *range = &info->lti_range;
2143         struct dt_device         *dev   = lfsck->li_bottom;
2144         struct seq_server_site   *ss    =
2145                                 lu_site2seq(dev->dd_lu_dev.ld_site);
2146         struct linkea_data        ldata = { 0 };
2147         __u32                     idx   = lfsck_dev_idx(dev);
2148         int                       rc;
2149         ENTRY;
2150
2151         rc = lfsck_links_read(env, obj, &ldata);
2152         if (rc == -ENOENT)
2153                 GOTO(out, rc = 0);
2154
2155         /* -EINVAL means crashed linkEA, should be verified. */
2156         if (rc == -EINVAL) {
2157                 rc = lfsck_namespace_trace_update(env, com, fid,
2158                                                   LNTF_CHECK_LINKEA, true);
2159                 if (rc == 0) {
2160                         struct lustre_handle lh = { 0 };
2161
2162                         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
2163                                               MDS_INODELOCK_UPDATE |
2164                                               MDS_INODELOCK_XATTR, LCK_EX);
2165                         if (rc == 0) {
2166                                 rc = lfsck_namespace_links_remove(env, com,
2167                                                                   obj);
2168                                 lfsck_ibits_unlock(&lh, LCK_EX);
2169                         }
2170                 }
2171
2172                 GOTO(out, rc = (rc == -ENOENT ? 0 : rc));
2173         }
2174
2175         /* zero-linkEA object may be orphan, but it also maybe because
2176          * of upgrading. Currently, we cannot record it for double scan.
2177          * Because it may cause the LFSCK tracing file to be too large. */
2178         if (rc == -ENODATA) {
2179                 if (S_ISDIR(lfsck_object_type(obj)))
2180                         GOTO(out, rc = 0);
2181
2182                 rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2183                 if (rc != 0)
2184                         GOTO(out, rc);
2185
2186                 if (la->la_nlink > 1)
2187                         rc = lfsck_namespace_trace_update(env, com, fid,
2188                                                 LNTF_CHECK_LINKEA, true);
2189
2190                 GOTO(out, rc);
2191         }
2192
2193         if (rc != 0)
2194                 GOTO(out, rc);
2195
2196         /* Record multiple-linked object. */
2197         if (ldata.ld_leh->leh_reccount > 1) {
2198                 rc = lfsck_namespace_trace_update(env, com, fid,
2199                                                   LNTF_CHECK_LINKEA, true);
2200
2201                 GOTO(out, rc);
2202         }
2203
2204         linkea_first_entry(&ldata);
2205         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
2206         if (!fid_is_sane(pfid)) {
2207                 rc = lfsck_namespace_trace_update(env, com, fid,
2208                                                   LNTF_CHECK_PARENT, true);
2209         } else {
2210                 fld_range_set_mdt(range);
2211                 rc = fld_local_lookup(env, ss->ss_server_fld,
2212                                       fid_seq(pfid), range);
2213                 if ((rc == -ENOENT) ||
2214                     (rc == 0 && range->lsr_index != idx)) {
2215                         rc = lfsck_namespace_trace_update(env, com, fid,
2216                                                 LNTF_CHECK_LINKEA, true);
2217                 } else {
2218                         if (S_ISDIR(lfsck_object_type(obj)))
2219                                 GOTO(out, rc = 0);
2220
2221                         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2222                         if (rc != 0)
2223                                 GOTO(out, rc);
2224
2225                         if (la->la_nlink > 1)
2226                                 rc = lfsck_namespace_trace_update(env, com,
2227                                                 fid, LNTF_CHECK_LINKEA, true);
2228                 }
2229         }
2230
2231         GOTO(out, rc);
2232
2233 out:
2234         down_write(&com->lc_sem);
2235         com->lc_new_checked++;
2236         if (S_ISDIR(lfsck_object_type(obj)))
2237                 ns->ln_dirs_checked++;
2238         if (rc != 0)
2239                 lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
2240         up_write(&com->lc_sem);
2241
2242         return rc;
2243 }
2244
2245 static int lfsck_namespace_exec_dir(const struct lu_env *env,
2246                                     struct lfsck_component *com,
2247                                     struct lu_dirent *ent, __u16 type)
2248 {
2249         struct lfsck_assistant_data     *lad    = com->lc_data;
2250         struct lfsck_namespace_req      *lnr;
2251         bool                             wakeup = false;
2252
2253         lnr = lfsck_namespace_assistant_req_init(com->lc_lfsck, ent, type);
2254         if (IS_ERR(lnr)) {
2255                 struct lfsck_namespace *ns = com->lc_file_ram;
2256
2257                 lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
2258                 return PTR_ERR(lnr);
2259         }
2260
2261         spin_lock(&lad->lad_lock);
2262         if (lad->lad_assistant_status < 0) {
2263                 spin_unlock(&lad->lad_lock);
2264                 lfsck_namespace_assistant_req_fini(env, &lnr->lnr_lar);
2265                 return lad->lad_assistant_status;
2266         }
2267
2268         list_add_tail(&lnr->lnr_lar.lar_list, &lad->lad_req_list);
2269         if (lad->lad_prefetched == 0)
2270                 wakeup = true;
2271
2272         lad->lad_prefetched++;
2273         spin_unlock(&lad->lad_lock);
2274         if (wakeup)
2275                 wake_up_all(&lad->lad_thread.t_ctl_waitq);
2276
2277         down_write(&com->lc_sem);
2278         com->lc_new_checked++;
2279         up_write(&com->lc_sem);
2280
2281         return 0;
2282 }
2283
2284 static int lfsck_namespace_post(const struct lu_env *env,
2285                                 struct lfsck_component *com,
2286                                 int result, bool init)
2287 {
2288         struct lfsck_instance   *lfsck = com->lc_lfsck;
2289         struct lfsck_namespace  *ns    = com->lc_file_ram;
2290         int                      rc;
2291         ENTRY;
2292
2293         lfsck_post_generic(env, com, &result);
2294
2295         down_write(&com->lc_sem);
2296         spin_lock(&lfsck->li_lock);
2297         if (!init)
2298                 ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
2299         if (result > 0) {
2300                 ns->ln_status = LS_SCANNING_PHASE2;
2301                 ns->ln_flags |= LF_SCANNED_ONCE;
2302                 ns->ln_flags &= ~LF_UPGRADE;
2303                 list_del_init(&com->lc_link_dir);
2304                 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
2305         } else if (result == 0) {
2306                 ns->ln_status = lfsck->li_status;
2307                 if (ns->ln_status == 0)
2308                         ns->ln_status = LS_STOPPED;
2309                 if (ns->ln_status != LS_PAUSED) {
2310                         list_del_init(&com->lc_link_dir);
2311                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
2312                 }
2313         } else {
2314                 ns->ln_status = LS_FAILED;
2315                 list_del_init(&com->lc_link_dir);
2316                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
2317         }
2318         spin_unlock(&lfsck->li_lock);
2319
2320         if (!init) {
2321                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
2322                                 HALF_SEC - lfsck->li_time_last_checkpoint);
2323                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
2324                 ns->ln_items_checked += com->lc_new_checked;
2325                 com->lc_new_checked = 0;
2326         }
2327
2328         rc = lfsck_namespace_store(env, com, false);
2329         up_write(&com->lc_sem);
2330
2331         CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n",
2332                lfsck_lfsck2name(lfsck), rc);
2333
2334         RETURN(rc);
2335 }
2336
2337 static int
2338 lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
2339                      struct seq_file *m)
2340 {
2341         struct lfsck_instance   *lfsck = com->lc_lfsck;
2342         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
2343         struct lfsck_namespace  *ns    = com->lc_file_ram;
2344         int                      rc;
2345
2346         down_read(&com->lc_sem);
2347         seq_printf(m, "name: lfsck_namespace\n"
2348                    "magic: %#x\n"
2349                    "version: %d\n"
2350                    "status: %s\n",
2351                    ns->ln_magic,
2352                    bk->lb_version,
2353                    lfsck_status2names(ns->ln_status));
2354
2355         rc = lfsck_bits_dump(m, ns->ln_flags, lfsck_flags_names, "flags");
2356         if (rc < 0)
2357                 goto out;
2358
2359         rc = lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param");
2360         if (rc < 0)
2361                 goto out;
2362
2363         rc = lfsck_time_dump(m, ns->ln_time_last_complete,
2364                              "time_since_last_completed");
2365         if (rc < 0)
2366                 goto out;
2367
2368         rc = lfsck_time_dump(m, ns->ln_time_latest_start,
2369                              "time_since_latest_start");
2370         if (rc < 0)
2371                 goto out;
2372
2373         rc = lfsck_time_dump(m, ns->ln_time_last_checkpoint,
2374                              "time_since_last_checkpoint");
2375         if (rc < 0)
2376                 goto out;
2377
2378         rc = lfsck_pos_dump(m, &ns->ln_pos_latest_start,
2379                             "latest_start_position");
2380         if (rc < 0)
2381                 goto out;
2382
2383         rc = lfsck_pos_dump(m, &ns->ln_pos_last_checkpoint,
2384                             "last_checkpoint_position");
2385         if (rc < 0)
2386                 goto out;
2387
2388         rc = lfsck_pos_dump(m, &ns->ln_pos_first_inconsistent,
2389                             "first_failure_position");
2390         if (rc < 0)
2391                 goto out;
2392
2393         if (ns->ln_status == LS_SCANNING_PHASE1) {
2394                 struct lfsck_position pos;
2395                 const struct dt_it_ops *iops;
2396                 cfs_duration_t duration = cfs_time_current() -
2397                                           lfsck->li_time_last_checkpoint;
2398                 __u64 checked = ns->ln_items_checked + com->lc_new_checked;
2399                 __u64 speed = checked;
2400                 __u64 new_checked = com->lc_new_checked * HZ;
2401                 __u32 rtime = ns->ln_run_time_phase1 +
2402                               cfs_duration_sec(duration + HALF_SEC);
2403
2404                 if (duration != 0)
2405                         do_div(new_checked, duration);
2406                 if (rtime != 0)
2407                         do_div(speed, rtime);
2408                 lfsck_namespace_dump_statistics(m, ns, checked,
2409                                                 ns->ln_objs_checked_phase2,
2410                                                 rtime, ns->ln_run_time_phase2);
2411
2412                 seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
2413                               "average_speed_phase2: N/A\n"
2414                               "real_time_speed_phase1: "LPU64" items/sec\n"
2415                               "real_time_speed_phase2: N/A\n",
2416                               speed,
2417                               new_checked);
2418
2419                 LASSERT(lfsck->li_di_oit != NULL);
2420
2421                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
2422
2423                 /* The low layer otable-based iteration position may NOT
2424                  * exactly match the namespace-based directory traversal
2425                  * cookie. Generally, it is not a serious issue. But the
2426                  * caller should NOT make assumption on that. */
2427                 pos.lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
2428                 if (!lfsck->li_current_oit_processed)
2429                         pos.lp_oit_cookie--;
2430
2431                 spin_lock(&lfsck->li_lock);
2432                 if (lfsck->li_di_dir != NULL) {
2433                         pos.lp_dir_cookie = lfsck->li_cookie_dir;
2434                         if (pos.lp_dir_cookie >= MDS_DIR_END_OFF) {
2435                                 fid_zero(&pos.lp_dir_parent);
2436                                 pos.lp_dir_cookie = 0;
2437                         } else {
2438                                 pos.lp_dir_parent =
2439                                         *lfsck_dto2fid(lfsck->li_obj_dir);
2440                         }
2441                 } else {
2442                         fid_zero(&pos.lp_dir_parent);
2443                         pos.lp_dir_cookie = 0;
2444                 }
2445                 spin_unlock(&lfsck->li_lock);
2446                 lfsck_pos_dump(m, &pos, "current_position");
2447         } else if (ns->ln_status == LS_SCANNING_PHASE2) {
2448                 cfs_duration_t duration = cfs_time_current() -
2449                                           lfsck->li_time_last_checkpoint;
2450                 __u64 checked = ns->ln_objs_checked_phase2 +
2451                                 com->lc_new_checked;
2452                 __u64 speed1 = ns->ln_items_checked;
2453                 __u64 speed2 = checked;
2454                 __u64 new_checked = com->lc_new_checked * HZ;
2455                 __u32 rtime = ns->ln_run_time_phase2 +
2456                               cfs_duration_sec(duration + HALF_SEC);
2457
2458                 if (duration != 0)
2459                         do_div(new_checked, duration);
2460                 if (ns->ln_run_time_phase1 != 0)
2461                         do_div(speed1, ns->ln_run_time_phase1);
2462                 if (rtime != 0)
2463                         do_div(speed2, rtime);
2464                 lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked,
2465                                                 checked,
2466                                                 ns->ln_run_time_phase1, rtime);
2467
2468                 seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
2469                               "average_speed_phase2: "LPU64" objs/sec\n"
2470                               "real_time_speed_phase1: N/A\n"
2471                               "real_time_speed_phase2: "LPU64" objs/sec\n"
2472                               "current_position: "DFID"\n",
2473                               speed1,
2474                               speed2,
2475                               new_checked,
2476                               PFID(&ns->ln_fid_latest_scanned_phase2));
2477         } else {
2478                 __u64 speed1 = ns->ln_items_checked;
2479                 __u64 speed2 = ns->ln_objs_checked_phase2;
2480
2481                 if (ns->ln_run_time_phase1 != 0)
2482                         do_div(speed1, ns->ln_run_time_phase1);
2483                 if (ns->ln_run_time_phase2 != 0)
2484                         do_div(speed2, ns->ln_run_time_phase2);
2485                 lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked,
2486                                                 ns->ln_objs_checked_phase2,
2487                                                 ns->ln_run_time_phase1,
2488                                                 ns->ln_run_time_phase2);
2489
2490                 seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
2491                               "average_speed_phase2: "LPU64" objs/sec\n"
2492                               "real_time_speed_phase1: N/A\n"
2493                               "real_time_speed_phase2: N/A\n"
2494                               "current_position: N/A\n",
2495                               speed1,
2496                               speed2);
2497         }
2498 out:
2499         up_read(&com->lc_sem);
2500         return 0;
2501 }
2502
2503 static int lfsck_namespace_double_scan(const struct lu_env *env,
2504                                        struct lfsck_component *com)
2505 {
2506         struct lfsck_namespace *ns = com->lc_file_ram;
2507
2508         return lfsck_double_scan_generic(env, com, ns->ln_status);
2509 }
2510
2511 static void lfsck_namespace_data_release(const struct lu_env *env,
2512                                          struct lfsck_component *com)
2513 {
2514         struct lfsck_assistant_data     *lad    = com->lc_data;
2515         struct lfsck_tgt_descs          *ltds   = &com->lc_lfsck->li_mdt_descs;
2516         struct lfsck_tgt_desc           *ltd;
2517         struct lfsck_tgt_desc           *next;
2518
2519         LASSERT(lad != NULL);
2520         LASSERT(thread_is_init(&lad->lad_thread) ||
2521                 thread_is_stopped(&lad->lad_thread));
2522         LASSERT(list_empty(&lad->lad_req_list));
2523
2524         com->lc_data = NULL;
2525
2526         spin_lock(&ltds->ltd_lock);
2527         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
2528                                  ltd_namespace_phase_list) {
2529                 list_del_init(&ltd->ltd_namespace_phase_list);
2530         }
2531         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
2532                                  ltd_namespace_phase_list) {
2533                 list_del_init(&ltd->ltd_namespace_phase_list);
2534         }
2535         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_list,
2536                                  ltd_namespace_list) {
2537                 list_del_init(&ltd->ltd_namespace_list);
2538         }
2539         spin_unlock(&ltds->ltd_lock);
2540
2541         CFS_FREE_BITMAP(lad->lad_bitmap);
2542
2543         OBD_FREE_PTR(lad);
2544 }
2545
2546 static int lfsck_namespace_in_notify(const struct lu_env *env,
2547                                      struct lfsck_component *com,
2548                                      struct lfsck_request *lr)
2549 {
2550         struct lfsck_instance           *lfsck = com->lc_lfsck;
2551         struct lfsck_namespace          *ns    = com->lc_file_ram;
2552         struct lfsck_assistant_data     *lad   = com->lc_data;
2553         struct lfsck_tgt_descs          *ltds  = &lfsck->li_mdt_descs;
2554         struct lfsck_tgt_desc           *ltd;
2555         bool                             fail  = false;
2556         ENTRY;
2557
2558         if (lr->lr_event != LE_PHASE1_DONE &&
2559             lr->lr_event != LE_PHASE2_DONE &&
2560             lr->lr_event != LE_PEER_EXIT)
2561                 RETURN(-EINVAL);
2562
2563         CDEBUG(D_LFSCK, "%s: namespace LFSCK handles notify %u from MDT %x, "
2564                "status %d\n", lfsck_lfsck2name(lfsck), lr->lr_event,
2565                lr->lr_index, lr->lr_status);
2566
2567         spin_lock(&ltds->ltd_lock);
2568         ltd = LTD_TGT(ltds, lr->lr_index);
2569         if (ltd == NULL) {
2570                 spin_unlock(&ltds->ltd_lock);
2571
2572                 RETURN(-ENXIO);
2573         }
2574
2575         list_del_init(&ltd->ltd_namespace_phase_list);
2576         switch (lr->lr_event) {
2577         case LE_PHASE1_DONE:
2578                 if (lr->lr_status <= 0) {
2579                         ltd->ltd_namespace_done = 1;
2580                         list_del_init(&ltd->ltd_namespace_list);
2581                         CDEBUG(D_LFSCK, "%s: MDT %x failed/stopped at "
2582                                "phase1 for namespace LFSCK: rc = %d.\n",
2583                                lfsck_lfsck2name(lfsck),
2584                                ltd->ltd_index, lr->lr_status);
2585                         ns->ln_flags |= LF_INCOMPLETE;
2586                         fail = true;
2587                         break;
2588                 }
2589
2590                 if (list_empty(&ltd->ltd_namespace_list))
2591                         list_add_tail(&ltd->ltd_namespace_list,
2592                                       &lad->lad_mdt_list);
2593                 list_add_tail(&ltd->ltd_namespace_phase_list,
2594                               &lad->lad_mdt_phase2_list);
2595                 break;
2596         case LE_PHASE2_DONE:
2597                 ltd->ltd_namespace_done = 1;
2598                 list_del_init(&ltd->ltd_namespace_list);
2599                 break;
2600         case LE_PEER_EXIT:
2601                 fail = true;
2602                 ltd->ltd_namespace_done = 1;
2603                 list_del_init(&ltd->ltd_namespace_list);
2604                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT)) {
2605                         CDEBUG(D_LFSCK,
2606                                "%s: the peer MDT %x exit namespace LFSCK\n",
2607                                lfsck_lfsck2name(lfsck), ltd->ltd_index);
2608                         ns->ln_flags |= LF_INCOMPLETE;
2609                 }
2610                 break;
2611         default:
2612                 break;
2613         }
2614         spin_unlock(&ltds->ltd_lock);
2615
2616         if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
2617                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2618
2619                 memset(stop, 0, sizeof(*stop));
2620                 stop->ls_status = lr->lr_status;
2621                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2622                 lfsck_stop(env, lfsck->li_bottom, stop);
2623         } else if (lfsck_phase2_next_ready(lad)) {
2624                 wake_up_all(&lad->lad_thread.t_ctl_waitq);
2625         }
2626
2627         RETURN(0);
2628 }
2629
2630 static int lfsck_namespace_query(const struct lu_env *env,
2631                                  struct lfsck_component *com)
2632 {
2633         struct lfsck_namespace *ns = com->lc_file_ram;
2634
2635         return ns->ln_status;
2636 }
2637
2638 static struct lfsck_operations lfsck_namespace_ops = {
2639         .lfsck_reset            = lfsck_namespace_reset,
2640         .lfsck_fail             = lfsck_namespace_fail,
2641         .lfsck_checkpoint       = lfsck_namespace_checkpoint,
2642         .lfsck_prep             = lfsck_namespace_prep,
2643         .lfsck_exec_oit         = lfsck_namespace_exec_oit,
2644         .lfsck_exec_dir         = lfsck_namespace_exec_dir,
2645         .lfsck_post             = lfsck_namespace_post,
2646         .lfsck_dump             = lfsck_namespace_dump,
2647         .lfsck_double_scan      = lfsck_namespace_double_scan,
2648         .lfsck_data_release     = lfsck_namespace_data_release,
2649         .lfsck_quit             = lfsck_quit_generic,
2650         .lfsck_in_notify        = lfsck_namespace_in_notify,
2651         .lfsck_query            = lfsck_namespace_query,
2652 };
2653
2654 static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
2655                                                 struct lfsck_component *com,
2656                                                 struct lfsck_assistant_req *lar)
2657 {
2658         struct lfsck_thread_info   *info     = lfsck_env_info(env);
2659         struct lu_attr             *la       = &info->lti_la;
2660         struct lfsck_instance      *lfsck    = com->lc_lfsck;
2661         struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
2662         struct lfsck_namespace     *ns       = com->lc_file_ram;
2663         struct linkea_data          ldata    = { 0 };
2664         const struct lu_name       *cname;
2665         struct thandle             *handle   = NULL;
2666         struct lfsck_namespace_req *lnr      =
2667                         container_of0(lar, struct lfsck_namespace_req, lnr_lar);
2668         struct dt_object           *dir      = lnr->lnr_obj;
2669         struct dt_object           *obj      = NULL;
2670         const struct lu_fid        *pfid     = lfsck_dto2fid(dir);
2671         struct dt_device           *dev;
2672         struct lustre_handle        lh       = { 0 };
2673         bool                        repaired = false;
2674         bool                        dtlocked = false;
2675         bool                        remove;
2676         bool                        newdata;
2677         bool                        log      = false;
2678         int                         idx;
2679         int                         count    = 0;
2680         int                         rc;
2681         ENTRY;
2682
2683         if (lnr->lnr_attr & LUDA_UPGRADE) {
2684                 ns->ln_flags |= LF_UPGRADE;
2685                 ns->ln_dirent_repaired++;
2686                 repaired = true;
2687         } else if (lnr->lnr_attr & LUDA_REPAIR) {
2688                 ns->ln_flags |= LF_INCONSISTENT;
2689                 ns->ln_dirent_repaired++;
2690                 repaired = true;
2691         }
2692
2693         if (unlikely(fid_is_zero(&lnr->lnr_fid))) {
2694                 if (strcmp(lnr->lnr_name, dotdot) != 0)
2695                         LBUG();
2696                 else
2697                         rc = lfsck_namespace_trace_update(env, com, pfid,
2698                                                 LNTF_CHECK_PARENT, true);
2699
2700                 GOTO(out, rc);
2701         }
2702
2703         if (lnr->lnr_name[0] == '.' &&
2704             (lnr->lnr_namelen == 1 || fid_seq_is_dot(fid_seq(&lnr->lnr_fid))))
2705                 GOTO(out, rc = 0);
2706
2707         idx = lfsck_find_mdt_idx_by_fid(env, lfsck, &lnr->lnr_fid);
2708         if (idx < 0)
2709                 GOTO(out, rc = idx);
2710
2711         if (idx == lfsck_dev_idx(lfsck->li_bottom)) {
2712                 if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0))
2713                         GOTO(out, rc = 0);
2714
2715                 dev = lfsck->li_next;
2716         } else {
2717                 struct lfsck_tgt_desc *ltd;
2718
2719                 /* Usually, some local filesystem consistency verification
2720                  * tools can guarantee the local namespace tree consistenct.
2721                  * So the LFSCK will only verify the remote directory. */
2722                 if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0)) {
2723                         rc = lfsck_namespace_trace_update(env, com, pfid,
2724                                                 LNTF_CHECK_PARENT, true);
2725
2726                         GOTO(out, rc);
2727                 }
2728
2729                 ltd = LTD_TGT(&lfsck->li_mdt_descs, idx);
2730                 if (unlikely(ltd == NULL)) {
2731                         CDEBUG(D_LFSCK, "%s: cannot talk with MDT %x which "
2732                                "did not join the namespace LFSCK\n",
2733                                lfsck_lfsck2name(lfsck), idx);
2734                         ns->ln_flags |= LF_INCOMPLETE;
2735
2736                         GOTO(out, rc = -ENODEV);
2737                 }
2738
2739                 dev = ltd->ltd_tgt;
2740         }
2741
2742         obj = lfsck_object_find_by_dev(env, dev, &lnr->lnr_fid);
2743         if (IS_ERR(obj))
2744                 GOTO(out, rc = PTR_ERR(obj));
2745
2746         if (dt_object_exists(obj) == 0) {
2747                 rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
2748                 if (rc != 0)
2749                         GOTO(out, rc);
2750
2751                 /* XXX: dangling name entry, will handle it in other patch. */
2752                 GOTO(out, rc);
2753         }
2754
2755         cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen);
2756         if (!(bk->lb_param & LPF_DRYRUN) && repaired) {
2757
2758 again:
2759                 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
2760                                       MDS_INODELOCK_UPDATE |
2761                                       MDS_INODELOCK_XATTR, LCK_EX);
2762                 if (rc != 0)
2763                         GOTO(out, rc);
2764
2765                 handle = dt_trans_create(env, dev);
2766                 if (IS_ERR(handle))
2767                         GOTO(out, rc = PTR_ERR(handle));
2768
2769                 rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
2770                 if (rc != 0)
2771                         GOTO(stop, rc);
2772
2773                 rc = dt_trans_start(env, dev, handle);
2774                 if (rc != 0)
2775                         GOTO(stop, rc);
2776
2777                 dt_write_lock(env, obj, 0);
2778                 dtlocked = true;
2779         }
2780
2781         rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
2782         if (rc != 0)
2783                 GOTO(stop, rc);
2784
2785         rc = lfsck_links_read(env, obj, &ldata);
2786         if (rc == 0) {
2787                 count = ldata.ld_leh->leh_reccount;
2788                 rc = linkea_links_find(&ldata, cname, pfid);
2789                 if ((rc == 0) &&
2790                     (count == 1 || !S_ISDIR(lfsck_object_type(obj))))
2791                         goto record;
2792
2793                 ns->ln_flags |= LF_INCONSISTENT;
2794                 /* For sub-dir object, we cannot make sure whether the sub-dir
2795                  * back references the parent via ".." name entry correctly or
2796                  * not in the LFSCK first-stage scanning. It may be that the
2797                  * (remote) sub-dir ".." name entry has no parent FID after
2798                  * file-level backup/restore and its linkEA may be wrong.
2799                  * So under such case, we should replace the linkEA according
2800                  * to current name entry. But this needs to be done during the
2801                  * LFSCK second-stage scanning. The LFSCK will record the name
2802                  * entry for further possible using. */
2803                 remove = false;
2804                 newdata = false;
2805                 goto nodata;
2806         } else if (unlikely(rc == -EINVAL)) {
2807                 count = 1;
2808                 ns->ln_flags |= LF_INCONSISTENT;
2809                 /* The magic crashed, we are not sure whether there are more
2810                  * corrupt data in the linkea, so remove all linkea entries. */
2811                 remove = true;
2812                 newdata = true;
2813                 goto nodata;
2814         } else if (rc == -ENODATA) {
2815                 count = 1;
2816                 ns->ln_flags |= LF_UPGRADE;
2817                 remove = false;
2818                 newdata = true;
2819
2820 nodata:
2821                 if (bk->lb_param & LPF_DRYRUN) {
2822                         ns->ln_linkea_repaired++;
2823                         repaired = true;
2824                         log = true;
2825                         goto record;
2826                 }
2827
2828                 if (!lustre_handle_is_used(&lh))
2829                         goto again;
2830
2831                 if (remove) {
2832                         LASSERT(newdata);
2833
2834                         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
2835                                           BYPASS_CAPA);
2836                         if (rc != 0)
2837                                 GOTO(stop, rc);
2838                 }
2839
2840                 if (newdata) {
2841                         rc = linkea_data_new(&ldata,
2842                                         &lfsck_env_info(env)->lti_linkea_buf);
2843                         if (rc != 0)
2844                                 GOTO(stop, rc);
2845                 }
2846
2847                 rc = linkea_add_buf(&ldata, cname, pfid);
2848                 if (rc != 0)
2849                         GOTO(stop, rc);
2850
2851                 rc = lfsck_links_write(env, obj, &ldata, handle);
2852                 if (rc != 0)
2853                         GOTO(stop, rc);
2854
2855                 count = ldata.ld_leh->leh_reccount;
2856                 if (!S_ISDIR(lfsck_object_type(obj)) ||
2857                     !dt_object_remote(obj)) {
2858                         ns->ln_linkea_repaired++;
2859                         repaired = true;
2860                         log = true;
2861                 }
2862         } else if (rc == -ENOENT) {
2863                 log = false;
2864                 repaired = false;
2865
2866                 GOTO(stop, rc = 0);
2867         } else {
2868                 GOTO(stop, rc);
2869         }
2870
2871 record:
2872         LASSERT(count > 0);
2873
2874         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2875         if (rc != 0)
2876                 GOTO(stop, rc);
2877
2878         if ((count == 1 && la->la_nlink == 1) ||
2879             S_ISDIR(lfsck_object_type(obj)))
2880                 /* Usually, it is for single linked object or dir, do nothing.*/
2881                 GOTO(stop, rc);
2882
2883         /* Following modification will be in another transaction.  */
2884         if (handle != NULL) {
2885                 LASSERT(dt_write_locked(env, obj));
2886
2887                 dt_write_unlock(env, obj);
2888                 dtlocked = false;
2889
2890                 dt_trans_stop(env, dev, handle);
2891                 handle = NULL;
2892
2893                 lfsck_ibits_unlock(&lh, LCK_EX);
2894         }
2895
2896         ns->ln_mul_linked_checked++;
2897         rc = lfsck_namespace_trace_update(env, com, &lnr->lnr_fid,
2898                                           LNTF_CHECK_LINKEA, true);
2899
2900         GOTO(out, rc);
2901
2902 stop:
2903         if (dtlocked)
2904                 dt_write_unlock(env, obj);
2905
2906         if (handle != NULL && !IS_ERR(handle))
2907                 dt_trans_stop(env, dev, handle);
2908
2909 out:
2910         lfsck_ibits_unlock(&lh, LCK_EX);
2911         down_write(&com->lc_sem);
2912         if (rc < 0) {
2913                 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle "
2914                        "the entry: "DFID", parent "DFID", name %.*s: rc = %d\n",
2915                        lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid),
2916                        PFID(lfsck_dto2fid(lnr->lnr_obj)),
2917                        lnr->lnr_namelen, lnr->lnr_name, rc);
2918
2919                 lfsck_namespace_record_failure(env, lfsck, ns);
2920                 if (!(bk->lb_param & LPF_FAILOUT))
2921                         rc = 0;
2922         } else {
2923                 if (log)
2924                         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant "
2925                                "repaired the entry: "DFID", parent "DFID
2926                                ", name %.*s\n", lfsck_lfsck2name(lfsck),
2927                                PFID(&lnr->lnr_fid),
2928                                PFID(lfsck_dto2fid(lnr->lnr_obj)),
2929                                lnr->lnr_namelen, lnr->lnr_name);
2930
2931                 if (repaired) {
2932                         ns->ln_items_repaired++;
2933                         if (bk->lb_param & LPF_DRYRUN &&
2934                             lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
2935                                 lfsck_pos_fill(env, lfsck,
2936                                                &ns->ln_pos_first_inconsistent,
2937                                                false);
2938                 }
2939                 rc = 0;
2940         }
2941         up_write(&com->lc_sem);
2942
2943         if (obj != NULL && !IS_ERR(obj))
2944                 lfsck_object_put(env, obj);
2945         return rc;
2946 }
2947
2948 static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env,
2949                                                 struct lfsck_component *com)
2950 {
2951         struct lfsck_instance   *lfsck  = com->lc_lfsck;
2952         struct ptlrpc_thread    *thread = &lfsck->li_thread;
2953         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
2954         struct lfsck_namespace  *ns     = com->lc_file_ram;
2955         struct dt_object        *obj    = com->lc_obj;
2956         const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
2957         struct dt_object        *target;
2958         struct dt_it            *di;
2959         struct dt_key           *key;
2960         struct lu_fid            fid;
2961         int                      rc;
2962         __u8                     flags  = 0;
2963         ENTRY;
2964
2965         CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
2966                lfsck_lfsck2name(lfsck));
2967
2968         com->lc_new_checked = 0;
2969         com->lc_new_scanned = 0;
2970         com->lc_time_last_checkpoint = cfs_time_current();
2971         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
2972                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
2973
2974         di = iops->init(env, obj, 0, BYPASS_CAPA);
2975         if (IS_ERR(di))
2976                 RETURN(PTR_ERR(di));
2977
2978         fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
2979         rc = iops->get(env, di, (const struct dt_key *)&fid);
2980         if (rc < 0)
2981                 GOTO(fini, rc);
2982
2983         /* Skip the start one, which either has been processed or non-exist. */
2984         rc = iops->next(env, di);
2985         if (rc != 0)
2986                 GOTO(put, rc);
2987
2988         do {
2989                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
2990                     cfs_fail_val > 0) {
2991                         struct l_wait_info lwi;
2992
2993                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
2994                                           NULL, NULL);
2995                         l_wait_event(thread->t_ctl_waitq,
2996                                      !thread_is_running(thread),
2997                                      &lwi);
2998
2999                         if (unlikely(!thread_is_running(thread)))
3000                                 GOTO(put, rc = 0);
3001                 }
3002
3003                 key = iops->key(env, di);
3004                 fid_be_to_cpu(&fid, (const struct lu_fid *)key);
3005                 if (!fid_is_sane(&fid)) {
3006                         rc = 0;
3007                         goto checkpoint;
3008                 }
3009
3010                 target = lfsck_object_find(env, lfsck, &fid);
3011                 if (IS_ERR(target)) {
3012                         rc = PTR_ERR(target);
3013                         goto checkpoint;
3014                 }
3015
3016                 if (dt_object_exists(target)) {
3017                         rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
3018                         if (rc == 0) {
3019                                 rc = lfsck_namespace_double_scan_one(env, com,
3020                                                                 target, flags);
3021                                 if (rc == -ENOENT)
3022                                         rc = 0;
3023                         }
3024                 }
3025
3026                 lfsck_object_put(env, target);
3027
3028 checkpoint:
3029                 down_write(&com->lc_sem);
3030                 com->lc_new_checked++;
3031                 com->lc_new_scanned++;
3032                 ns->ln_fid_latest_scanned_phase2 = fid;
3033                 if (rc > 0)
3034                         ns->ln_objs_repaired_phase2++;
3035                 else if (rc < 0)
3036                         ns->ln_objs_failed_phase2++;
3037                 up_write(&com->lc_sem);
3038
3039                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
3040                         GOTO(put, rc);
3041
3042                 if (unlikely(cfs_time_beforeq(com->lc_time_next_checkpoint,
3043                                               cfs_time_current())) &&
3044                     com->lc_new_checked != 0) {
3045                         down_write(&com->lc_sem);
3046                         ns->ln_run_time_phase2 +=
3047                                 cfs_duration_sec(cfs_time_current() +
3048                                 HALF_SEC - com->lc_time_last_checkpoint);
3049                         ns->ln_time_last_checkpoint = cfs_time_current_sec();
3050                         ns->ln_objs_checked_phase2 += com->lc_new_checked;
3051                         com->lc_new_checked = 0;
3052                         rc = lfsck_namespace_store(env, com, false);
3053                         up_write(&com->lc_sem);
3054                         if (rc != 0)
3055                                 GOTO(put, rc);
3056
3057                         com->lc_time_last_checkpoint = cfs_time_current();
3058                         com->lc_time_next_checkpoint =
3059                                 com->lc_time_last_checkpoint +
3060                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
3061                 }
3062
3063                 lfsck_control_speed_by_self(com);
3064                 if (unlikely(!thread_is_running(thread)))
3065                         GOTO(put, rc = 0);
3066
3067                 rc = iops->next(env, di);
3068         } while (rc == 0);
3069
3070         GOTO(put, rc);
3071
3072 put:
3073         iops->put(env, di);
3074
3075 fini:
3076         iops->fini(env, di);
3077
3078         CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan stop: rc = %d\n",
3079                lfsck_lfsck2name(lfsck), rc);
3080
3081         return rc;
3082 }
3083
3084 static void lfsck_namespace_assistant_fill_pos(const struct lu_env *env,
3085                                                struct lfsck_component *com,
3086                                                struct lfsck_position *pos)
3087 {
3088         struct lfsck_assistant_data     *lad = com->lc_data;
3089         struct lfsck_namespace_req      *lnr;
3090
3091         if (list_empty(&lad->lad_req_list))
3092                 return;
3093
3094         lnr = list_entry(lad->lad_req_list.next,
3095                          struct lfsck_namespace_req,
3096                          lnr_lar.lar_list);
3097         pos->lp_oit_cookie = lnr->lnr_oit_cookie;
3098         pos->lp_dir_cookie = lnr->lnr_dir_cookie - 1;
3099         pos->lp_dir_parent = *lfsck_dto2fid(lnr->lnr_obj);
3100 }
3101
3102 static int lfsck_namespace_double_scan_result(const struct lu_env *env,
3103                                               struct lfsck_component *com,
3104                                               int rc)
3105 {
3106         struct lfsck_instance   *lfsck  = com->lc_lfsck;
3107         struct lfsck_namespace  *ns     = com->lc_file_ram;
3108
3109         down_write(&com->lc_sem);
3110         ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
3111                                 HALF_SEC - lfsck->li_time_last_checkpoint);
3112         ns->ln_time_last_checkpoint = cfs_time_current_sec();
3113         ns->ln_objs_checked_phase2 += com->lc_new_checked;
3114         com->lc_new_checked = 0;
3115
3116         if (rc > 0) {
3117                 if (ns->ln_flags & LF_INCOMPLETE)
3118                         ns->ln_status = LS_PARTIAL;
3119                 else
3120                         ns->ln_status = LS_COMPLETED;
3121                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN))
3122                         ns->ln_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
3123                 ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
3124                 ns->ln_success_count++;
3125         } else if (rc == 0) {
3126                 ns->ln_status = lfsck->li_status;
3127                 if (ns->ln_status == 0)
3128                         ns->ln_status = LS_STOPPED;
3129         } else {
3130                 ns->ln_status = LS_FAILED;
3131         }
3132
3133         rc = lfsck_namespace_store(env, com, false);
3134         up_write(&com->lc_sem);
3135
3136         return rc;
3137 }
3138
3139 static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env,
3140                                                     struct lfsck_component *com,
3141                                                     struct lfsck_request *lr)
3142 {
3143         /* XXX: TBD */
3144 }
3145
3146 struct lfsck_assistant_operations lfsck_namespace_assistant_ops = {
3147         .la_handler_p1          = lfsck_namespace_assistant_handler_p1,
3148         .la_handler_p2          = lfsck_namespace_assistant_handler_p2,
3149         .la_fill_pos            = lfsck_namespace_assistant_fill_pos,
3150         .la_double_scan_result  = lfsck_namespace_double_scan_result,
3151         .la_req_fini            = lfsck_namespace_assistant_req_fini,
3152         .la_sync_failures       = lfsck_namespace_assistant_sync_failures,
3153 };
3154
3155 /**
3156  * Verify the specified linkEA entry for the given directory object.
3157  * If the object has no such linkEA entry or it has more other linkEA
3158  * entries, then re-generate the linkEA with the given information.
3159  *
3160  * \param[in] env       pointer to the thread context
3161  * \param[in] dev       pointer to the dt_device
3162  * \param[in] obj       pointer to the dt_object to be handled
3163  * \param[in] cname     the name for the child in the parent directory
3164  * \param[in] pfid      the parent directory's FID for the linkEA
3165  *
3166  * \retval              0 for success
3167  * \retval              negative error number on failure
3168  */
3169 int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev,
3170                         struct dt_object *obj, const struct lu_name *cname,
3171                         const struct lu_fid *pfid)
3172 {
3173         struct linkea_data       ldata  = { 0 };
3174         struct lu_buf            linkea_buf;
3175         struct thandle          *th;
3176         int                      rc;
3177         int                      fl     = LU_XATTR_CREATE;
3178         bool                     dirty  = false;
3179         ENTRY;
3180
3181         LASSERT(S_ISDIR(lfsck_object_type(obj)));
3182
3183         rc = lfsck_links_read(env, obj, &ldata);
3184         if (rc == -ENODATA) {
3185                 dirty = true;
3186         } else if (rc == 0) {
3187                 fl = LU_XATTR_REPLACE;
3188                 if (ldata.ld_leh->leh_reccount != 1) {
3189                         dirty = true;
3190                 } else {
3191                         rc = linkea_links_find(&ldata, cname, pfid);
3192                         if (rc != 0)
3193                                 dirty = true;
3194                 }
3195         }
3196
3197         if (!dirty)
3198                 RETURN(rc);
3199
3200         rc = linkea_data_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf);
3201         if (rc != 0)
3202                 RETURN(rc);
3203
3204         rc = linkea_add_buf(&ldata, cname, pfid);
3205         if (rc != 0)
3206                 RETURN(rc);
3207
3208         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
3209                        ldata.ld_leh->leh_len);
3210         th = dt_trans_create(env, dev);
3211         if (IS_ERR(th))
3212                 RETURN(PTR_ERR(th));
3213
3214         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
3215                                   XATTR_NAME_LINK, fl, th);
3216         if (rc != 0)
3217                 GOTO(stop, rc);
3218
3219         rc = dt_trans_start_local(env, dev, th);
3220         if (rc != 0)
3221                 GOTO(stop, rc);
3222
3223         dt_write_lock(env, obj, 0);
3224         rc = dt_xattr_set(env, obj, &linkea_buf,
3225                           XATTR_NAME_LINK, fl, th, BYPASS_CAPA);
3226         dt_write_unlock(env, obj);
3227
3228         GOTO(stop, rc);
3229
3230 stop:
3231         dt_trans_stop(env, dev, th);
3232         return rc;
3233 }
3234
3235 /**
3236  * Get the name and parent directory's FID from the first linkEA entry.
3237  *
3238  * \param[in] env       pointer to the thread context
3239  * \param[in] obj       pointer to the object which get linkEA from
3240  * \param[out] name     pointer to the buffer to hold the name
3241  *                      in the first linkEA entry
3242  * \param[out] pfid     pointer to the buffer to hold the parent
3243  *                      directory's FID in the first linkEA entry
3244  *
3245  * \retval              0 for success
3246  * \retval              negative error number on failure
3247  */
3248 int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj,
3249                           char *name, struct lu_fid *pfid)
3250 {
3251         struct lu_name           *cname = &lfsck_env_info(env)->lti_name;
3252         struct linkea_data        ldata = { 0 };
3253         int                       rc;
3254
3255         rc = lfsck_links_read(env, obj, &ldata);
3256         if (rc != 0)
3257                 return rc;
3258
3259         linkea_first_entry(&ldata);
3260         if (ldata.ld_lee == NULL)
3261                 return -ENODATA;
3262
3263         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
3264         /* To guarantee the 'name' is terminated with '0'. */
3265         memcpy(name, cname->ln_name, cname->ln_namelen);
3266         name[cname->ln_namelen] = 0;
3267
3268         return 0;
3269 }
3270
3271 /**
3272  * Remove the name entry from the parent directory.
3273  *
3274  * No need to care about the object referenced by the name entry,
3275  * either the name entry is invalid or redundant, or the referenced
3276  * object has been processed has been or will be handled by others.
3277  *
3278  * \param[in] env       pointer to the thread context
3279  * \param[in] lfsck     pointer to the lfsck instance
3280  * \param[in] parent    pointer to the lost+found object
3281  * \param[in] name      the name for the name entry to be removed
3282  * \param[in] type      the type for the name entry to be removed
3283  *
3284  * \retval              0 for success
3285  * \retval              negative error number on failure
3286  */
3287 int lfsck_remove_name_entry(const struct lu_env *env,
3288                             struct lfsck_instance *lfsck,
3289                             struct dt_object *parent,
3290                             const char *name, __u32 type)
3291 {
3292         struct dt_device        *dev    = lfsck->li_next;
3293         struct thandle          *th;
3294         struct lustre_handle     lh     = { 0 };
3295         int                      rc;
3296         ENTRY;
3297
3298         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
3299                               MDS_INODELOCK_UPDATE, LCK_EX);
3300         if (rc != 0)
3301                 RETURN(rc);
3302
3303         th = dt_trans_create(env, dev);
3304         if (IS_ERR(th))
3305                 GOTO(unlock, rc = PTR_ERR(th));
3306
3307         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
3308         if (rc != 0)
3309                 GOTO(stop, rc);
3310
3311         if (S_ISDIR(type)) {
3312                 rc = dt_declare_ref_del(env, parent, th);
3313                 if (rc != 0)
3314                         GOTO(stop, rc);
3315         }
3316
3317         rc = dt_trans_start(env, dev, th);
3318         if (rc != 0)
3319                 GOTO(stop, rc);
3320
3321         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
3322                        BYPASS_CAPA);
3323         if (rc != 0)
3324                 GOTO(stop, rc);
3325
3326         if (S_ISDIR(type)) {
3327                 dt_write_lock(env, parent, 0);
3328                 rc = dt_ref_del(env, parent, th);
3329                 dt_write_unlock(env, parent);
3330         }
3331
3332         GOTO(stop, rc);
3333
3334 stop:
3335         dt_trans_stop(env, dev, th);
3336
3337 unlock:
3338         lfsck_ibits_unlock(&lh, LCK_EX);
3339
3340         CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s "
3341                "with type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
3342                PFID(lfsck_dto2fid(parent)), name, type, rc);
3343
3344         return rc;
3345 }
3346
3347 /**
3348  * Update the object's name entry with the given FID.
3349  *
3350  * \param[in] env       pointer to the thread context
3351  * \param[in] lfsck     pointer to the lfsck instance
3352  * \param[in] parent    pointer to the parent directory that holds
3353  *                      the name entry
3354  * \param[in] name      the name for the entry to be updated
3355  * \param[in] pfid      the new PFID for the name entry
3356  * \param[in] type      the type for the name entry to be updated
3357  *
3358  * \retval              0 for success
3359  * \retval              negative error number on failure
3360  */
3361 int lfsck_update_name_entry(const struct lu_env *env,
3362                             struct lfsck_instance *lfsck,
3363                             struct dt_object *parent, const char *name,
3364                             const struct lu_fid *pfid, __u32 type)
3365 {
3366         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
3367         struct dt_device        *dev    = lfsck->li_next;
3368         struct lustre_handle     lh     = { 0 };
3369         struct thandle          *th;
3370         int                      rc;
3371         bool                     exists = true;
3372         ENTRY;
3373
3374         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
3375                               MDS_INODELOCK_UPDATE, LCK_EX);
3376         if (rc != 0)
3377                 RETURN(rc);
3378
3379         th = dt_trans_create(env, dev);
3380         if (IS_ERR(th))
3381                 GOTO(unlock, rc = PTR_ERR(th));
3382
3383         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
3384         if (rc != 0)
3385                 GOTO(stop, rc);
3386
3387         rec->rec_type = type;
3388         rec->rec_fid = pfid;
3389         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
3390                                (const struct dt_key *)name, th);
3391         if (rc != 0)
3392                 GOTO(stop, rc);
3393
3394         rc = dt_declare_ref_add(env, parent, th);
3395         if (rc != 0)
3396                 GOTO(stop, rc);
3397
3398         rc = dt_trans_start(env, dev, th);
3399         if (rc != 0)
3400                 GOTO(stop, rc);
3401
3402         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
3403                        BYPASS_CAPA);
3404         if (rc == -ENOENT) {
3405                 exists = false;
3406                 rc = 0;
3407         }
3408
3409         if (rc != 0)
3410                 GOTO(stop, rc);
3411
3412         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
3413                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
3414         if (rc == 0 && S_ISDIR(type) && !exists) {
3415                 dt_write_lock(env, parent, 0);
3416                 rc = dt_ref_add(env, parent, th);
3417                 dt_write_unlock(env, parent);
3418         }
3419
3420         GOTO(stop, rc);
3421
3422 stop:
3423         dt_trans_stop(env, dev, th);
3424
3425 unlock:
3426         lfsck_ibits_unlock(&lh, LCK_EX);
3427
3428         CDEBUG(D_LFSCK, "%s: update name entry "DFID"/%s with the FID "DFID
3429                " and the type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
3430                PFID(lfsck_dto2fid(parent)), name, PFID(pfid), type, rc);
3431
3432         return rc;
3433 }
3434
3435 int lfsck_namespace_setup(const struct lu_env *env,
3436                           struct lfsck_instance *lfsck)
3437 {
3438         struct lfsck_component  *com;
3439         struct lfsck_namespace  *ns;
3440         struct dt_object        *root = NULL;
3441         struct dt_object        *obj;
3442         int                      rc;
3443         ENTRY;
3444
3445         LASSERT(lfsck->li_master);
3446
3447         OBD_ALLOC_PTR(com);
3448         if (com == NULL)
3449                 RETURN(-ENOMEM);
3450
3451         INIT_LIST_HEAD(&com->lc_link);
3452         INIT_LIST_HEAD(&com->lc_link_dir);
3453         init_rwsem(&com->lc_sem);
3454         atomic_set(&com->lc_ref, 1);
3455         com->lc_lfsck = lfsck;
3456         com->lc_type = LFSCK_TYPE_NAMESPACE;
3457         com->lc_ops = &lfsck_namespace_ops;
3458         com->lc_data = lfsck_assistant_data_init(
3459                         &lfsck_namespace_assistant_ops,
3460                         "lfsck_namespace");
3461         if (com->lc_data == NULL)
3462                 GOTO(out, rc = -ENOMEM);
3463
3464         com->lc_file_size = sizeof(struct lfsck_namespace);
3465         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
3466         if (com->lc_file_ram == NULL)
3467                 GOTO(out, rc = -ENOMEM);
3468
3469         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
3470         if (com->lc_file_disk == NULL)
3471                 GOTO(out, rc = -ENOMEM);
3472
3473         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
3474         if (IS_ERR(root))
3475                 GOTO(out, rc = PTR_ERR(root));
3476
3477         if (unlikely(!dt_try_as_dir(env, root)))
3478                 GOTO(out, rc = -ENOTDIR);
3479
3480         obj = local_index_find_or_create(env, lfsck->li_los, root,
3481                                          lfsck_namespace_name,
3482                                          S_IFREG | S_IRUGO | S_IWUSR,
3483                                          &dt_lfsck_features);
3484         if (IS_ERR(obj))
3485                 GOTO(out, rc = PTR_ERR(obj));
3486
3487         com->lc_obj = obj;
3488         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
3489         if (rc != 0)
3490                 GOTO(out, rc);
3491
3492         rc = lfsck_namespace_load(env, com);
3493         if (rc > 0)
3494                 rc = lfsck_namespace_reset(env, com, true);
3495         else if (rc == -ENODATA)
3496                 rc = lfsck_namespace_init(env, com);
3497         if (rc != 0)
3498                 GOTO(out, rc);
3499
3500         ns = com->lc_file_ram;
3501         switch (ns->ln_status) {
3502         case LS_INIT:
3503         case LS_COMPLETED:
3504         case LS_FAILED:
3505         case LS_STOPPED:
3506                 spin_lock(&lfsck->li_lock);
3507                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
3508                 spin_unlock(&lfsck->li_lock);
3509                 break;
3510         default:
3511                 CERROR("%s: unknown lfsck_namespace status %d\n",
3512                        lfsck_lfsck2name(lfsck), ns->ln_status);
3513                 /* fall through */
3514         case LS_SCANNING_PHASE1:
3515         case LS_SCANNING_PHASE2:
3516                 /* No need to store the status to disk right now.
3517                  * If the system crashed before the status stored,
3518                  * it will be loaded back when next time. */
3519                 ns->ln_status = LS_CRASHED;
3520                 /* fall through */
3521         case LS_PAUSED:
3522         case LS_CRASHED:
3523                 spin_lock(&lfsck->li_lock);
3524                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
3525                 list_add_tail(&com->lc_link_dir, &lfsck->li_list_dir);
3526                 spin_unlock(&lfsck->li_lock);
3527                 break;
3528         }
3529
3530         GOTO(out, rc = 0);
3531
3532 out:
3533         if (root != NULL && !IS_ERR(root))
3534                 lu_object_put(env, &root->do_lu);
3535         if (rc != 0) {
3536                 lfsck_component_cleanup(env, com);
3537                 CERROR("%s: fail to init namespace LFSCK component: rc = %d\n",
3538                        lfsck_lfsck2name(lfsck), rc);
3539         }
3540         return rc;
3541 }