Whamcloud - gitweb
8a688c79e07c3eee59ad8782ea71061ef240b0b7
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_namespace.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lustre/lustre_idl.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fid.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre/lustre_user.h>
41
42 #include "lfsck_internal.h"
43
44 #define LFSCK_NAMESPACE_MAGIC   0xA0629D03
45
46 enum lfsck_nameentry_check {
47         LFSCK_NAMEENTRY_DEAD            = 1, /* The object has been unlinked. */
48         LFSCK_NAMEENTRY_REMOVED         = 2, /* The entry has been removed. */
49         LFSCK_NAMEENTRY_RECREATED       = 3, /* The entry has been recreated. */
50 };
51
52 static const char lfsck_namespace_name[] = "lfsck_namespace";
53
54 static struct lfsck_namespace_req *
55 lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck,
56                                    struct lu_dirent *ent, __u16 type)
57 {
58         struct lfsck_namespace_req *lnr;
59         int                         size;
60
61         size = sizeof(*lnr) + (ent->lde_namelen & ~3) + 4;
62         OBD_ALLOC(lnr, size);
63         if (lnr == NULL)
64                 return ERR_PTR(-ENOMEM);
65
66         INIT_LIST_HEAD(&lnr->lnr_lar.lar_list);
67         lu_object_get(&lfsck->li_obj_dir->do_lu);
68         lnr->lnr_obj = lfsck->li_obj_dir;
69         lnr->lnr_fid = ent->lde_fid;
70         lnr->lnr_oit_cookie = lfsck->li_pos_current.lp_oit_cookie;
71         lnr->lnr_dir_cookie = ent->lde_hash;
72         lnr->lnr_attr = ent->lde_attrs;
73         lnr->lnr_size = size;
74         lnr->lnr_type = type;
75         lnr->lnr_namelen = ent->lde_namelen;
76         memcpy(lnr->lnr_name, ent->lde_name, ent->lde_namelen);
77
78         return lnr;
79 }
80
81 static void lfsck_namespace_assistant_req_fini(const struct lu_env *env,
82                                                struct lfsck_assistant_req *lar)
83 {
84         struct lfsck_namespace_req *lnr =
85                         container_of0(lar, struct lfsck_namespace_req, lnr_lar);
86
87         lu_object_put(env, &lnr->lnr_obj->do_lu);
88         OBD_FREE(lnr, lnr->lnr_size);
89 }
90
91 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
92                                       struct lfsck_namespace *src)
93 {
94         dst->ln_magic = le32_to_cpu(src->ln_magic);
95         dst->ln_status = le32_to_cpu(src->ln_status);
96         dst->ln_flags = le32_to_cpu(src->ln_flags);
97         dst->ln_success_count = le32_to_cpu(src->ln_success_count);
98         dst->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
99         dst->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
100         dst->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
101         dst->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
102         dst->ln_time_last_checkpoint =
103                                 le64_to_cpu(src->ln_time_last_checkpoint);
104         lfsck_position_le_to_cpu(&dst->ln_pos_latest_start,
105                                  &src->ln_pos_latest_start);
106         lfsck_position_le_to_cpu(&dst->ln_pos_last_checkpoint,
107                                  &src->ln_pos_last_checkpoint);
108         lfsck_position_le_to_cpu(&dst->ln_pos_first_inconsistent,
109                                  &src->ln_pos_first_inconsistent);
110         dst->ln_items_checked = le64_to_cpu(src->ln_items_checked);
111         dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
112         dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
113         dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
114         dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
115         dst->ln_objs_repaired_phase2 =
116                                 le64_to_cpu(src->ln_objs_repaired_phase2);
117         dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
118         dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
119         dst->ln_objs_lost_found = le64_to_cpu(src->ln_objs_lost_found);
120         fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2,
121                       &src->ln_fid_latest_scanned_phase2);
122         dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
123         dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
124         dst->ln_mul_linked_checked = le64_to_cpu(src->ln_mul_linked_checked);
125         dst->ln_mul_linked_repaired = le64_to_cpu(src->ln_mul_linked_repaired);
126         dst->ln_unknown_inconsistency =
127                                 le64_to_cpu(src->ln_unknown_inconsistency);
128         dst->ln_unmatched_pairs_repaired =
129                                 le64_to_cpu(src->ln_unmatched_pairs_repaired);
130         dst->ln_dangling_repaired = le64_to_cpu(src->ln_dangling_repaired);
131         dst->ln_mul_ref_repaired = le64_to_cpu(src->ln_mul_ref_repaired);
132 }
133
134 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
135                                       struct lfsck_namespace *src)
136 {
137         dst->ln_magic = cpu_to_le32(src->ln_magic);
138         dst->ln_status = cpu_to_le32(src->ln_status);
139         dst->ln_flags = cpu_to_le32(src->ln_flags);
140         dst->ln_success_count = cpu_to_le32(src->ln_success_count);
141         dst->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
142         dst->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
143         dst->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
144         dst->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
145         dst->ln_time_last_checkpoint =
146                                 cpu_to_le64(src->ln_time_last_checkpoint);
147         lfsck_position_cpu_to_le(&dst->ln_pos_latest_start,
148                                  &src->ln_pos_latest_start);
149         lfsck_position_cpu_to_le(&dst->ln_pos_last_checkpoint,
150                                  &src->ln_pos_last_checkpoint);
151         lfsck_position_cpu_to_le(&dst->ln_pos_first_inconsistent,
152                                  &src->ln_pos_first_inconsistent);
153         dst->ln_items_checked = cpu_to_le64(src->ln_items_checked);
154         dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
155         dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
156         dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
157         dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
158         dst->ln_objs_repaired_phase2 =
159                                 cpu_to_le64(src->ln_objs_repaired_phase2);
160         dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
161         dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
162         dst->ln_objs_lost_found = cpu_to_le64(src->ln_objs_lost_found);
163         fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2,
164                       &src->ln_fid_latest_scanned_phase2);
165         dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
166         dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
167         dst->ln_mul_linked_checked = cpu_to_le64(src->ln_mul_linked_checked);
168         dst->ln_mul_linked_repaired = cpu_to_le64(src->ln_mul_linked_repaired);
169         dst->ln_unknown_inconsistency =
170                                 cpu_to_le64(src->ln_unknown_inconsistency);
171         dst->ln_unmatched_pairs_repaired =
172                                 cpu_to_le64(src->ln_unmatched_pairs_repaired);
173         dst->ln_dangling_repaired = cpu_to_le64(src->ln_dangling_repaired);
174         dst->ln_mul_ref_repaired = cpu_to_le64(src->ln_mul_ref_repaired);
175 }
176
177 static void lfsck_namespace_record_failure(const struct lu_env *env,
178                                            struct lfsck_instance *lfsck,
179                                            struct lfsck_namespace *ns)
180 {
181         struct lfsck_position pos;
182
183         ns->ln_items_failed++;
184         lfsck_pos_fill(env, lfsck, &pos, false);
185         if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent) ||
186             lfsck_pos_is_eq(&pos, &ns->ln_pos_first_inconsistent) < 0) {
187                 ns->ln_pos_first_inconsistent = pos;
188
189                 CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
190                        "inconsistency at the pos ["LPU64", "DFID", "LPX64"]\n",
191                        lfsck_lfsck2name(lfsck),
192                        ns->ln_pos_first_inconsistent.lp_oit_cookie,
193                        PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
194                        ns->ln_pos_first_inconsistent.lp_dir_cookie);
195         }
196 }
197
198 /**
199  * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
200  * \retval 0: succeed.
201  * \retval -ve: failed cases.
202  */
203 static int lfsck_namespace_load(const struct lu_env *env,
204                                 struct lfsck_component *com)
205 {
206         int len = com->lc_file_size;
207         int rc;
208
209         rc = dt_xattr_get(env, com->lc_obj,
210                           lfsck_buf_get(env, com->lc_file_disk, len),
211                           XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
212         if (rc == len) {
213                 struct lfsck_namespace *ns = com->lc_file_ram;
214
215                 lfsck_namespace_le_to_cpu(ns,
216                                 (struct lfsck_namespace *)com->lc_file_disk);
217                 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
218                         CDEBUG(D_LFSCK, "%s: invalid lfsck_namespace magic "
219                                "%#x != %#x\n", lfsck_lfsck2name(com->lc_lfsck),
220                                ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
221                         rc = 1;
222                 } else {
223                         rc = 0;
224                 }
225         } else if (rc != -ENODATA) {
226                 CDEBUG(D_LFSCK, "%s: fail to load lfsck_namespace, "
227                        "expected = %d: rc = %d\n",
228                        lfsck_lfsck2name(com->lc_lfsck), len, rc);
229                 if (rc >= 0)
230                         rc = 1;
231         }
232         return rc;
233 }
234
235 static int lfsck_namespace_store(const struct lu_env *env,
236                                  struct lfsck_component *com, bool init)
237 {
238         struct dt_object        *obj    = com->lc_obj;
239         struct lfsck_instance   *lfsck  = com->lc_lfsck;
240         struct thandle          *handle;
241         int                      len    = com->lc_file_size;
242         int                      rc;
243         ENTRY;
244
245         lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
246                                   (struct lfsck_namespace *)com->lc_file_ram);
247         handle = dt_trans_create(env, lfsck->li_bottom);
248         if (IS_ERR(handle))
249                 GOTO(log, rc = PTR_ERR(handle));
250
251         rc = dt_declare_xattr_set(env, obj,
252                                   lfsck_buf_get(env, com->lc_file_disk, len),
253                                   XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
254         if (rc != 0)
255                 GOTO(out, rc);
256
257         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
258         if (rc != 0)
259                 GOTO(out, rc);
260
261         rc = dt_xattr_set(env, obj,
262                           lfsck_buf_get(env, com->lc_file_disk, len),
263                           XATTR_NAME_LFSCK_NAMESPACE,
264                           init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
265                           handle, BYPASS_CAPA);
266
267         GOTO(out, rc);
268
269 out:
270         dt_trans_stop(env, lfsck->li_bottom, handle);
271
272 log:
273         if (rc != 0)
274                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_namespace: rc = %d\n",
275                        lfsck_lfsck2name(lfsck), rc);
276         return rc;
277 }
278
279 static int lfsck_namespace_init(const struct lu_env *env,
280                                 struct lfsck_component *com)
281 {
282         struct lfsck_namespace *ns = com->lc_file_ram;
283         int rc;
284
285         memset(ns, 0, sizeof(*ns));
286         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
287         ns->ln_status = LS_INIT;
288         down_write(&com->lc_sem);
289         rc = lfsck_namespace_store(env, com, true);
290         up_write(&com->lc_sem);
291         return rc;
292 }
293
294 /**
295  * Update the namespace LFSCK tracing file for the given @fid
296  *
297  * \param[in] env       pointer to the thread context
298  * \param[in] com       pointer to the lfsck component
299  * \param[in] fid       the fid which flags to be updated in the lfsck
300  *                      tracing file
301  * \param[in] add       true if add new flags, otherwise remove flags
302  *
303  * \retval              0 for succeed or nothing to be done
304  * \retval              negative error number on failure
305  */
306 int lfsck_namespace_trace_update(const struct lu_env *env,
307                                  struct lfsck_component *com,
308                                  const struct lu_fid *fid,
309                                  const __u8 flags, bool add)
310 {
311         struct lfsck_instance   *lfsck  = com->lc_lfsck;
312         struct dt_object        *obj    = com->lc_obj;
313         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid3;
314         struct dt_device        *dev    = lfsck->li_bottom;
315         struct thandle          *th     = NULL;
316         int                      rc     = 0;
317         __u8                     old    = 0;
318         __u8                     new    = 0;
319         ENTRY;
320
321         LASSERT(flags != 0);
322
323         down_write(&com->lc_sem);
324         fid_cpu_to_be(key, fid);
325         rc = dt_lookup(env, obj, (struct dt_rec *)&old,
326                        (const struct dt_key *)key, BYPASS_CAPA);
327         if (rc == -ENOENT) {
328                 if (!add)
329                         GOTO(unlock, rc = 0);
330
331                 old = 0;
332                 new = flags;
333         } else if (rc == 0) {
334                 if (add) {
335                         if ((old & flags) == flags)
336                                 GOTO(unlock, rc = 0);
337
338                         new = old | flags;
339                 } else {
340                         if ((old & flags) == 0)
341                                 GOTO(unlock, rc = 0);
342
343                         new = old & ~flags;
344                 }
345         } else {
346                 GOTO(log, rc);
347         }
348
349         th = dt_trans_create(env, dev);
350         if (IS_ERR(th))
351                 GOTO(log, rc = PTR_ERR(th));
352
353         if (old != 0) {
354                 rc = dt_declare_delete(env, obj,
355                                        (const struct dt_key *)key, th);
356                 if (rc != 0)
357                         GOTO(log, rc);
358         }
359
360         if (new != 0) {
361                 rc = dt_declare_insert(env, obj,
362                                        (const struct dt_rec *)&new,
363                                        (const struct dt_key *)key, th);
364                 if (rc != 0)
365                         GOTO(log, rc);
366         }
367
368         rc = dt_trans_start_local(env, dev, th);
369         if (rc != 0)
370                 GOTO(log, rc);
371
372         if (old != 0) {
373                 rc = dt_delete(env, obj, (const struct dt_key *)key,
374                                th, BYPASS_CAPA);
375                 if (rc != 0)
376                         GOTO(log, rc);
377         }
378
379         if (new != 0) {
380                 rc = dt_insert(env, obj, (const struct dt_rec *)&new,
381                                (const struct dt_key *)key, th, BYPASS_CAPA, 1);
382                 if (rc != 0)
383                         GOTO(log, rc);
384         }
385
386         GOTO(log, rc);
387
388 log:
389         if (th != NULL && !IS_ERR(th))
390                 dt_trans_stop(env, dev, th);
391
392         CDEBUG(D_LFSCK, "%s: namespace LFSCK %s flags for "DFID" in the "
393                "tracing file, flags %x, old %x, new %x: rc = %d\n",
394                lfsck_lfsck2name(lfsck), add ? "add" : "del", PFID(fid),
395                (__u32)flags, (__u32)old, (__u32)new, rc);
396
397 unlock:
398         up_write(&com->lc_sem);
399
400         return rc;
401 }
402
403 static int lfsck_namespace_check_exist(const struct lu_env *env,
404                                        struct dt_object *dir,
405                                        struct dt_object *obj, const char *name)
406 {
407         struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
408         int               rc;
409         ENTRY;
410
411         if (unlikely(lfsck_is_dead_obj(obj)))
412                 RETURN(LFSCK_NAMEENTRY_DEAD);
413
414         rc = dt_lookup(env, dir, (struct dt_rec *)fid,
415                        (const struct dt_key *)name, BYPASS_CAPA);
416         if (rc == -ENOENT)
417                 RETURN(LFSCK_NAMEENTRY_REMOVED);
418
419         if (rc < 0)
420                 RETURN(rc);
421
422         if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
423                 RETURN(LFSCK_NAMEENTRY_RECREATED);
424
425         RETURN(0);
426 }
427
428 static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
429                                             struct dt_object *obj,
430                                             struct thandle *handle)
431 {
432         int rc;
433
434         /* For destroying all invalid linkEA entries. */
435         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
436         if (rc != 0)
437                 return rc;
438
439         /* For insert new linkEA entry. */
440         rc = dt_declare_xattr_set(env, obj,
441                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
442                         XATTR_NAME_LINK, 0, handle);
443         return rc;
444 }
445
446 int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
447                        struct linkea_data *ldata)
448 {
449         int rc;
450
451         if (ldata->ld_buf->lb_buf == NULL)
452                 return -ENOMEM;
453
454         if (!dt_object_exists(obj))
455                 return -ENOENT;
456
457         rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
458         if (rc == -ERANGE) {
459                 /* Buf was too small, figure out what we need. */
460                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LINK,
461                                   BYPASS_CAPA);
462                 if (rc <= 0)
463                         return rc;
464
465                 lu_buf_realloc(ldata->ld_buf, rc);
466                 if (ldata->ld_buf->lb_buf == NULL)
467                         return -ENOMEM;
468
469                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
470                                   BYPASS_CAPA);
471         }
472
473         if (rc > 0)
474                 rc = linkea_init(ldata);
475
476         return rc;
477 }
478
479 /**
480  * Remove linkEA for the given object.
481  *
482  * The caller should take the ldlm lock before the calling.
483  *
484  * \param[in] env       pointer to the thread context
485  * \param[in] com       pointer to the lfsck component
486  * \param[in] obj       pointer to the dt_object to be handled
487  *
488  * \retval              0 for repaired cases
489  * \retval              negative error number on failure
490  */
491 static int lfsck_namespace_links_remove(const struct lu_env *env,
492                                         struct lfsck_component *com,
493                                         struct dt_object *obj)
494 {
495         struct lfsck_instance           *lfsck  = com->lc_lfsck;
496         struct dt_device                *dev    = lfsck->li_bottom;
497         struct thandle                  *th     = NULL;
498         int                              rc     = 0;
499         ENTRY;
500
501         LASSERT(dt_object_remote(obj) == 0);
502
503         th = dt_trans_create(env, dev);
504         if (IS_ERR(th))
505                 GOTO(log, rc = PTR_ERR(th));
506
507         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, th);
508         if (rc != 0)
509                 GOTO(stop, rc);
510
511         rc = dt_trans_start_local(env, dev, th);
512         if (rc != 0)
513                 GOTO(stop, rc);
514
515         dt_write_lock(env, obj, 0);
516         if (unlikely(lfsck_is_dead_obj(obj)))
517                 GOTO(unlock, rc = -ENOENT);
518
519         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
520                 GOTO(unlock, rc = 0);
521
522         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th, BYPASS_CAPA);
523
524         GOTO(unlock, rc);
525
526 unlock:
527         dt_write_unlock(env, obj);
528
529 stop:
530         dt_trans_stop(env, dev, th);
531
532 log:
533         CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
534                "for the object "DFID": rc = %d\n",
535                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
536
537         if (rc == 0) {
538                 struct lfsck_namespace *ns = com->lc_file_ram;
539
540                 ns->ln_flags |= LF_INCONSISTENT;
541         }
542
543         return rc;
544 }
545
546 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
547                              struct linkea_data *ldata, struct thandle *handle)
548 {
549         const struct lu_buf *buf = lfsck_buf_get_const(env,
550                                                        ldata->ld_buf->lb_buf,
551                                                        ldata->ld_leh->leh_len);
552
553         return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle,
554                             BYPASS_CAPA);
555 }
556
557 static void lfsck_namespace_unpack_linkea_entry(struct linkea_data *ldata,
558                                                 struct lu_name *cname,
559                                                 struct lu_fid *pfid,
560                                                 char *buf)
561 {
562         linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
563         /* To guarantee the 'name' is terminated with '0'. */
564         memcpy(buf, cname->ln_name, cname->ln_namelen);
565         buf[cname->ln_namelen] = 0;
566         cname->ln_name = buf;
567 }
568
569 static int lfsck_namespace_filter_linkea_entry(struct linkea_data *ldata,
570                                                struct lu_name *cname,
571                                                struct lu_fid *pfid,
572                                                bool remove)
573 {
574         struct link_ea_entry    *oldlee;
575         int                      oldlen;
576         int                      repeated = 0;
577
578         oldlee = ldata->ld_lee;
579         oldlen = ldata->ld_reclen;
580         linkea_next_entry(ldata);
581         while (ldata->ld_lee != NULL) {
582                 ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
583                                    ldata->ld_lee->lee_reclen[1];
584                 if (unlikely(ldata->ld_reclen == oldlen &&
585                              memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
586                         repeated++;
587                         if (!remove)
588                                 break;
589
590                         linkea_del_buf(ldata, cname);
591                 } else {
592                         linkea_next_entry(ldata);
593                 }
594         }
595         ldata->ld_lee = oldlee;
596         ldata->ld_reclen = oldlen;
597
598         return repeated;
599 }
600
601 /**
602  * Insert orphan into .lustre/lost+found/MDTxxxx/ locally.
603  *
604  * Add the specified orphan MDT-object to the .lustre/lost+found/MDTxxxx/
605  * with the given type to generate the name, the detailed rules for name
606  * have been described as following.
607  *
608  * The function also generates the linkEA corresponding to the name entry
609  * under the .lustre/lost+found/MDTxxxx/ for the orphan MDT-object.
610  *
611  * \param[in] env       pointer to the thread context
612  * \param[in] com       pointer to the lfsck component
613  * \param[in] orphan    pointer to the orphan MDT-object
614  * \param[in] infix     additional information for the orphan name, such as
615  *                      the FID for original
616  * \param[in] type      the type for describing why the orphan MDT-object is
617  *                      created. The rules are as following:
618  *
619  *  type "D":           The MDT-object is a directory, it may knows its parent
620  *                      but because there is no valid linkEA, the LFSCK cannot
621  *                      know where to put it back to the namespace.
622  *  type "O":           The MDT-object has no linkEA, and there is no name
623  *                      entry that references the MDT-object.
624  *
625  * \see lfsck_layout_recreate_parent() for more types.
626  *
627  * The orphan name will be like:
628  * ${FID}-${infix}-${type}-${conflict_version}
629  *
630  * \param[out] count    if some others inserted some linkEA entries by race,
631  *                      then return the linkEA entries count.
632  *
633  * \retval              positive number for repaired cases
634  * \retval              0 if needs to repair nothing
635  * \retval              negative error number on failure
636  */
637 static int lfsck_namespace_insert_orphan(const struct lu_env *env,
638                                          struct lfsck_component *com,
639                                          struct dt_object *orphan,
640                                          const char *infix, const char *type,
641                                          int *count)
642 {
643         struct lfsck_thread_info        *info   = lfsck_env_info(env);
644         struct lu_name                  *cname  = &info->lti_name;
645         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
646         struct lu_fid                   *tfid   = &info->lti_fid5;
647         const struct lu_fid             *cfid   = lfsck_dto2fid(orphan);
648         const struct lu_fid             *pfid;
649         struct lfsck_instance           *lfsck  = com->lc_lfsck;
650         struct dt_device                *dev    = lfsck->li_bottom;
651         struct dt_object                *parent;
652         struct thandle                  *th     = NULL;
653         struct lustre_handle             plh    = { 0 };
654         struct lustre_handle             clh    = { 0 };
655         struct linkea_data               ldata  = { 0 };
656         struct lu_buf                    linkea_buf;
657         int                              namelen;
658         int                              idx    = 0;
659         int                              rc     = 0;
660         bool                             exist  = false;
661         ENTRY;
662
663         cname->ln_name = NULL;
664         /* Create .lustre/lost+found/MDTxxxx when needed. */
665         if (unlikely(lfsck->li_lpf_obj == NULL)) {
666                 rc = lfsck_create_lpf(env, lfsck);
667                 if (rc != 0)
668                         GOTO(log, rc);
669         }
670
671         parent = lfsck->li_lpf_obj;
672         pfid = lfsck_dto2fid(parent);
673
674         /* Hold update lock on the parent to prevent others to access. */
675         rc = lfsck_ibits_lock(env, lfsck, parent, &plh,
676                               MDS_INODELOCK_UPDATE, LCK_EX);
677         if (rc != 0)
678                 GOTO(log, rc);
679
680         do {
681                 namelen = snprintf(info->lti_key, NAME_MAX, DFID"%s-%s-%d",
682                                    PFID(cfid), infix, type, idx++);
683                 rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
684                                (const struct dt_key *)info->lti_key,
685                                BYPASS_CAPA);
686                 if (rc != 0 && rc != -ENOENT)
687                         GOTO(log, rc);
688
689                 if (unlikely(rc == 0 && lu_fid_eq(cfid, tfid)))
690                         exist = true;
691         } while (rc == 0 && !exist);
692
693         cname->ln_name = info->lti_key;
694         cname->ln_namelen = namelen;
695         rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
696         if (rc != 0)
697                 GOTO(log, rc);
698
699         rc = linkea_add_buf(&ldata, cname, pfid);
700         if (rc != 0)
701                 GOTO(log, rc);
702
703         rc = lfsck_ibits_lock(env, lfsck, orphan, &clh,
704                               MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP,
705                               LCK_EX);
706         if (rc != 0)
707                 GOTO(log, rc);
708
709         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
710                        ldata.ld_leh->leh_len);
711         th = dt_trans_create(env, dev);
712         if (IS_ERR(th))
713                 GOTO(log, rc = PTR_ERR(th));
714
715         if (S_ISDIR(lfsck_object_type(orphan))) {
716                 rc = dt_declare_delete(env, orphan,
717                                        (const struct dt_key *)dotdot, th);
718                 if (rc != 0)
719                         GOTO(stop, rc);
720
721                 rec->rec_type = S_IFDIR;
722                 rec->rec_fid = pfid;
723                 rc = dt_declare_insert(env, orphan, (const struct dt_rec *)rec,
724                                        (const struct dt_key *)dotdot, th);
725                 if (rc != 0)
726                         GOTO(stop, rc);
727         }
728
729         rc = dt_declare_xattr_set(env, orphan, &linkea_buf,
730                                   XATTR_NAME_LINK, 0, th);
731         if (rc != 0)
732                 GOTO(stop, rc);
733
734         if (!exist) {
735                 rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
736                 rec->rec_fid = cfid;
737                 rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
738                                        (const struct dt_key *)cname->ln_name,
739                                        th);
740                 if (rc != 0)
741                         GOTO(stop, rc);
742
743                 if (S_ISDIR(rec->rec_type)) {
744                         rc = dt_declare_ref_add(env, parent, th);
745                         if (rc != 0)
746                                 GOTO(stop, rc);
747                 }
748         }
749
750         rc = dt_trans_start_local(env, dev, th);
751         if (rc != 0)
752                 GOTO(stop, rc);
753
754         dt_write_lock(env, orphan, 0);
755         rc = lfsck_links_read(env, orphan, &ldata);
756         if (likely((rc == -ENODATA) || (rc == -EINVAL) ||
757                    (rc == 0 && ldata.ld_leh->leh_reccount == 0))) {
758                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
759                         GOTO(unlock, rc = 1);
760
761                 if (S_ISDIR(lfsck_object_type(orphan))) {
762                         rc = dt_delete(env, orphan,
763                                        (const struct dt_key *)dotdot, th,
764                                        BYPASS_CAPA);
765                         if (rc != 0)
766                                 GOTO(unlock, rc);
767
768                         rec->rec_type = S_IFDIR;
769                         rec->rec_fid = pfid;
770                         rc = dt_insert(env, orphan, (const struct dt_rec *)rec,
771                                        (const struct dt_key *)dotdot, th,
772                                        BYPASS_CAPA, 1);
773                         if (rc != 0)
774                                 GOTO(unlock, rc);
775                 }
776
777                 rc = dt_xattr_set(env, orphan, &linkea_buf, XATTR_NAME_LINK, 0,
778                                   th, BYPASS_CAPA);
779         } else {
780                 if (rc == 0 && count != NULL)
781                         *count = ldata.ld_leh->leh_reccount;
782
783                 GOTO(unlock, rc);
784         }
785         dt_write_unlock(env, orphan);
786
787         if (rc == 0 && !exist) {
788                 rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
789                 rec->rec_fid = cfid;
790                 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
791                                (const struct dt_key *)cname->ln_name,
792                                th, BYPASS_CAPA, 1);
793                 if (rc == 0 && S_ISDIR(rec->rec_type)) {
794                         dt_write_lock(env, parent, 0);
795                         rc = dt_ref_add(env, parent, th);
796                         dt_write_unlock(env, parent);
797                 }
798         }
799
800         GOTO(stop, rc = (rc == 0 ? 1 : rc));
801
802 unlock:
803         dt_write_unlock(env, orphan);
804
805 stop:
806         dt_trans_stop(env, dev, th);
807
808 log:
809         lfsck_ibits_unlock(&clh, LCK_EX);
810         lfsck_ibits_unlock(&plh, LCK_EX);
811         CDEBUG(D_LFSCK, "%s: namespace LFSCK insert orphan for the "
812                "object "DFID", name = %s: rc = %d\n",
813                lfsck_lfsck2name(lfsck), PFID(cfid),
814                cname->ln_name != NULL ? cname->ln_name : "<NULL>", rc);
815
816         if (rc != 0) {
817                 struct lfsck_namespace *ns = com->lc_file_ram;
818
819                 ns->ln_flags |= LF_INCONSISTENT;
820         }
821
822         return rc;
823 }
824
825 static int lfsck_namespace_insert_normal(const struct lu_env *env,
826                                          struct lfsck_component *com,
827                                          struct dt_object *parent,
828                                          struct dt_object *child,
829                                          const char *name)
830 {
831         /* XXX: TBD */
832         return 0;
833 }
834
835 static int lfsck_namespace_create_orphan(const struct lu_env *env,
836                                          struct lfsck_component *com,
837                                          struct dt_object *orphan)
838 {
839         /* XXX: TBD */
840         return 0;
841 }
842
843 /**
844  * Remove the specified entry from the linkEA.
845  *
846  * Locate the linkEA entry with the given @cname and @pfid, then
847  * remove this entry or the other entries those are repeated with
848  * this entry.
849  *
850  * \param[in] env       pointer to the thread context
851  * \param[in] com       pointer to the lfsck component
852  * \param[in] obj       pointer to the dt_object to be handled
853  * \param[in,out]ldata  pointer to the buffer that holds the linkEA
854  * \param[in] cname     the name for the child in the parent directory
855  * \param[in] pfid      the parent directory's FID for the linkEA
856  * \param[in] next      if true, then remove the first found linkEA
857  *                      entry, and move the ldata->ld_lee to next entry
858  *
859  * \retval              positive number for repaired cases
860  * \retval              0 if nothing to be repaired
861  * \retval              negative error number on failure
862  */
863 static int lfsck_namespace_shrink_linkea(const struct lu_env *env,
864                                          struct lfsck_component *com,
865                                          struct dt_object *obj,
866                                          struct linkea_data *ldata,
867                                          struct lu_name *cname,
868                                          struct lu_fid *pfid,
869                                          bool next)
870 {
871         struct lfsck_instance           *lfsck     = com->lc_lfsck;
872         struct dt_device                *dev       = lfsck->li_bottom;
873         struct lfsck_bookmark           *bk        = &lfsck->li_bookmark_ram;
874         struct thandle                  *th        = NULL;
875         struct lustre_handle             lh        = { 0 };
876         struct linkea_data               ldata_new = { 0 };
877         struct lu_buf                    linkea_buf;
878         int                              rc        = 0;
879         ENTRY;
880
881         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
882                               MDS_INODELOCK_UPDATE |
883                               MDS_INODELOCK_XATTR, LCK_EX);
884         if (rc != 0)
885                 GOTO(log, rc);
886
887         if (next)
888                 linkea_del_buf(ldata, cname);
889         else
890                 lfsck_namespace_filter_linkea_entry(ldata, cname, pfid,
891                                                     true);
892         lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
893                        ldata->ld_leh->leh_len);
894
895 again:
896         th = dt_trans_create(env, dev);
897         if (IS_ERR(th))
898                 GOTO(unlock1, rc = PTR_ERR(th));
899
900         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
901                                   XATTR_NAME_LINK, 0, th);
902         if (rc != 0)
903                 GOTO(stop, rc);
904
905         rc = dt_trans_start_local(env, dev, th);
906         if (rc != 0)
907                 GOTO(stop, rc);
908
909         dt_write_lock(env, obj, 0);
910         if (unlikely(lfsck_is_dead_obj(obj)))
911                 GOTO(unlock2, rc = -ENOENT);
912
913         rc = lfsck_links_read2(env, obj, &ldata_new);
914         if (rc != 0)
915                 GOTO(unlock2, rc);
916
917         /* The specified linkEA entry has been removed by race. */
918         rc = linkea_links_find(&ldata_new, cname, pfid);
919         if (rc != 0)
920                 GOTO(unlock2, rc = 0);
921
922         if (bk->lb_param & LPF_DRYRUN)
923                 GOTO(unlock2, rc = 1);
924
925         if (next)
926                 linkea_del_buf(&ldata_new, cname);
927         else
928                 lfsck_namespace_filter_linkea_entry(&ldata_new, cname, pfid,
929                                                     true);
930
931         if (linkea_buf.lb_len < ldata_new.ld_leh->leh_len) {
932                 dt_write_unlock(env, obj);
933                 dt_trans_stop(env, dev, th);
934                 lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
935                                ldata_new.ld_leh->leh_len);
936                 goto again;
937         }
938
939         lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
940                        ldata_new.ld_leh->leh_len);
941         rc = dt_xattr_set(env, obj, &linkea_buf,
942                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
943
944         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
945
946 unlock2:
947         dt_write_unlock(env, obj);
948
949 stop:
950         dt_trans_stop(env, dev, th);
951
952 unlock1:
953         lfsck_ibits_unlock(&lh, LCK_EX);
954
955 log:
956         CDEBUG(D_LFSCK, "%s: namespace LFSCK remove %s linkEA entry "
957                "for the object: "DFID", parent "DFID", name %.*s\n",
958                lfsck_lfsck2name(lfsck), next ? "invalid" : "redundant",
959                PFID(lfsck_dto2fid(obj)), PFID(pfid), cname->ln_namelen,
960                cname->ln_name);
961
962         if (rc != 0) {
963                 struct lfsck_namespace *ns = com->lc_file_ram;
964
965                 ns->ln_flags |= LF_INCONSISTENT;
966         }
967
968         return rc;
969 }
970
971 /**
972  * Conditionally remove the specified entry from the linkEA.
973  *
974  * Take the parent lock firstly, then check whether the specified
975  * name entry exists or not: if yes, do nothing; otherwise, call
976  * lfsck_namespace_shrink_linkea() to remove the linkea entry.
977  *
978  * \param[in] env       pointer to the thread context
979  * \param[in] com       pointer to the lfsck component
980  * \param[in] parent    pointer to the parent directory
981  * \param[in] child     pointer to the child object that holds the linkEA
982  * \param[in,out]ldata  pointer to the buffer that holds the linkEA
983  * \param[in] cname     the name for the child in the parent directory
984  * \param[in] pfid      the parent directory's FID for the linkEA
985  *
986  * \retval              positive number for repaired cases
987  * \retval              0 if nothing to be repaired
988  * \retval              negative error number on failure
989  */
990 static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env,
991                                               struct lfsck_component *com,
992                                               struct dt_object *parent,
993                                               struct dt_object *child,
994                                               struct linkea_data *ldata,
995                                               struct lu_name *cname,
996                                               struct lu_fid *pfid)
997 {
998         struct lu_fid           *cfid   = &lfsck_env_info(env)->lti_fid3;
999         struct lustre_handle     lh     = { 0 };
1000         int                      rc;
1001         ENTRY;
1002
1003         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
1004                               MDS_INODELOCK_UPDATE, LCK_EX);
1005         if (rc != 0)
1006                 RETURN(rc);
1007
1008         dt_read_lock(env, parent, 0);
1009         if (unlikely(lfsck_is_dead_obj(parent))) {
1010                 dt_read_unlock(env, parent);
1011                 lfsck_ibits_unlock(&lh, LCK_EX);
1012                 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
1013                                                    cname, pfid, true);
1014
1015                 RETURN(rc);
1016         }
1017
1018         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1019                        (const struct dt_key *)cname->ln_name,
1020                        BYPASS_CAPA);
1021         dt_read_unlock(env, parent);
1022
1023         /* It is safe to release the ldlm lock, because when the logic come
1024          * here, we have got all the needed information above whether the
1025          * linkEA entry is valid or not. It is not important that others
1026          * may add new linkEA entry after the ldlm lock released. If other
1027          * has removed the specified linkEA entry by race, then it is OK,
1028          * because the subsequent lfsck_namespace_shrink_linkea() can handle
1029          * such case. */
1030         lfsck_ibits_unlock(&lh, LCK_EX);
1031         if (rc == -ENOENT) {
1032                 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
1033                                                    cname, pfid, true);
1034
1035                 RETURN(rc);
1036         }
1037
1038         if (rc != 0)
1039                 RETURN(rc);
1040
1041         /* The LFSCK just found some internal status of cross-MDTs
1042          * create operation. That is normal. */
1043         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
1044                 linkea_next_entry(ldata);
1045
1046                 RETURN(0);
1047         }
1048
1049         rc = lfsck_namespace_shrink_linkea(env, com, child, ldata, cname,
1050                                            pfid, true);
1051
1052         RETURN(rc);
1053 }
1054
1055 /**
1056  * Conditionally replace name entry in the parent.
1057  *
1058  * As required, the LFSCK may re-create the lost MDT-object for dangling
1059  * name entry, but such repairing may be wrong because of bad FID in the
1060  * name entry. As the LFSCK processing, the real MDT-object may be found,
1061  * then the LFSCK should check whether the former re-created MDT-object
1062  * has been modified or not, if not, then destroy it and update the name
1063  * entry in the parent to reference the real MDT-object.
1064  *
1065  * \param[in] env       pointer to the thread context
1066  * \param[in] com       pointer to the lfsck component
1067  * \param[in] parent    pointer to the parent directory
1068  * \param[in] child     pointer to the MDT-object that may be the real
1069  *                      MDT-object corresponding to the name entry in parent
1070  * \param[in] cfid      the current FID in the name entry
1071  * \param[in] cname     contains the name of the child in the parent directory
1072  *
1073  * \retval              positive number for repaired cases
1074  * \retval              0 if nothing to be repaired
1075  * \retval              negative error number on failure
1076  */
1077 static int lfsck_namespace_replace_cond(const struct lu_env *env,
1078                                         struct lfsck_component *com,
1079                                         struct dt_object *parent,
1080                                         struct dt_object *child,
1081                                         const struct lu_fid *cfid,
1082                                         const struct lu_name *cname)
1083 {
1084         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1085         struct lu_fid                   *tfid   = &info->lti_fid5;
1086         struct lu_attr                  *la     = &info->lti_la;
1087         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1088         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1089         struct dt_device                *dev    = lfsck->li_next;
1090         const char                      *name   = cname->ln_name;
1091         struct dt_object                *obj    = NULL;
1092         struct lustre_handle             plh    = { 0 };
1093         struct lustre_handle             clh    = { 0 };
1094         struct linkea_data               ldata  = { 0 };
1095         struct thandle                  *th     = NULL;
1096         bool                             exist  = true;
1097         int                              rc     = 0;
1098         ENTRY;
1099
1100         rc = lfsck_ibits_lock(env, lfsck, parent, &plh,
1101                               MDS_INODELOCK_UPDATE, LCK_EX);
1102         if (rc != 0)
1103                 GOTO(log, rc);
1104
1105         if (!fid_is_sane(cfid)) {
1106                 exist = false;
1107                 goto replace;
1108         }
1109
1110         obj = lfsck_object_find(env, lfsck, cfid);
1111         if (IS_ERR(obj)) {
1112                 rc = PTR_ERR(obj);
1113                 if (rc == -ENOENT) {
1114                         exist = false;
1115                         goto replace;
1116                 }
1117
1118                 GOTO(log, rc);
1119         }
1120
1121         if (!dt_object_exists(obj)) {
1122                 exist = false;
1123                 goto replace;
1124         }
1125
1126         rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
1127                        (const struct dt_key *)name, BYPASS_CAPA);
1128         if (rc == -ENOENT) {
1129                 exist = false;
1130                 goto replace;
1131         }
1132
1133         if (rc != 0)
1134                 GOTO(log, rc);
1135
1136         /* Someone changed the name entry, cannot replace it. */
1137         if (!lu_fid_eq(cfid, tfid))
1138                 GOTO(log, rc = 0);
1139
1140         /* lock the object to be destroyed. */
1141         rc = lfsck_ibits_lock(env, lfsck, obj, &clh,
1142                               MDS_INODELOCK_UPDATE |
1143                               MDS_INODELOCK_XATTR, LCK_EX);
1144         if (rc != 0)
1145                 GOTO(log, rc);
1146
1147         if (unlikely(lfsck_is_dead_obj(obj))) {
1148                 exist = false;
1149                 goto replace;
1150         }
1151
1152         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
1153         if (rc != 0)
1154                 GOTO(log, rc);
1155
1156         /* The object has been modified by other(s), or it is not created by
1157          * LFSCK, the two cases are indistinguishable. So cannot replace it. */
1158         if (la->la_ctime != 0)
1159                 GOTO(log, rc);
1160
1161         if (S_ISREG(la->la_mode)) {
1162                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
1163                                   BYPASS_CAPA);
1164                 /* If someone has created related OST-object(s),
1165                  * then keep it. */
1166                 if ((rc > 0) || (rc < 0 && rc != -ENODATA))
1167                         GOTO(log, rc = (rc > 0 ? 0 : rc));
1168         }
1169
1170 replace:
1171         dt_read_lock(env, child, 0);
1172         rc = lfsck_links_read2(env, child, &ldata);
1173         dt_read_unlock(env, child);
1174
1175         /* Someone changed the child, no need to replace. */
1176         if (rc == -ENODATA)
1177                 GOTO(log, rc = 0);
1178
1179         if (rc != 0)
1180                 GOTO(log, rc);
1181
1182         rc = linkea_links_find(&ldata, cname, lfsck_dto2fid(parent));
1183         /* Someone moved the child, no need to replace. */
1184         if (rc != 0)
1185                 GOTO(log, rc = 0);
1186
1187         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1188                 GOTO(log, rc = 1);
1189
1190         th = dt_trans_create(env, dev);
1191         if (IS_ERR(th))
1192                 GOTO(log, rc = PTR_ERR(th));
1193
1194         if (exist) {
1195                 rc = dt_declare_destroy(env, obj, th);
1196                 if (rc != 0)
1197                         GOTO(stop, rc);
1198         }
1199
1200         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
1201         if (rc != 0)
1202                 GOTO(stop, rc);
1203
1204         rec->rec_type = S_IFDIR;
1205         rec->rec_fid = lfsck_dto2fid(child);
1206         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1207                                (const struct dt_key *)name, th);
1208         if (rc != 0)
1209                 GOTO(stop, rc);
1210
1211         rc = dt_trans_start(env, dev, th);
1212         if (rc != 0)
1213                 GOTO(stop, rc);
1214
1215         if (exist) {
1216                 rc = dt_destroy(env, obj, th);
1217                 if (rc != 0)
1218                         GOTO(stop, rc);
1219         }
1220
1221         /* The old name entry maybe not exist. */
1222         dt_delete(env, parent, (const struct dt_key *)name, th,
1223                   BYPASS_CAPA);
1224
1225         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1226                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
1227
1228         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1229
1230 stop:
1231         dt_trans_stop(env, dev, th);
1232
1233 log:
1234         lfsck_ibits_unlock(&clh, LCK_EX);
1235         lfsck_ibits_unlock(&plh, LCK_EX);
1236         if (obj != NULL && !IS_ERR(obj))
1237                 lfsck_object_put(env, obj);
1238
1239         CDEBUG(D_LFSCK, "%s: namespace LFSCK conditionally destroy the "
1240                "object "DFID" because of conflict with the object "DFID
1241                " under the parent "DFID" with name %s: rc = %d\n",
1242                lfsck_lfsck2name(lfsck), PFID(cfid),
1243                PFID(lfsck_dto2fid(child)), PFID(lfsck_dto2fid(parent)),
1244                name, rc);
1245
1246         return rc;
1247 }
1248
1249 /**
1250  * Overwrite the linkEA for the object with the given ldata.
1251  *
1252  * The caller should take the ldlm lock before the calling.
1253  *
1254  * \param[in] env       pointer to the thread context
1255  * \param[in] com       pointer to the lfsck component
1256  * \param[in] obj       pointer to the dt_object to be handled
1257  * \param[in] ldata     pointer to the new linkEA data
1258  *
1259  * \retval              positive number for repaired cases
1260  * \retval              0 if nothing to be repaired
1261  * \retval              negative error number on failure
1262  */
1263 int lfsck_namespace_rebuild_linkea(const struct lu_env *env,
1264                                    struct lfsck_component *com,
1265                                    struct dt_object *obj,
1266                                    struct linkea_data *ldata)
1267 {
1268         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1269         struct dt_device                *dev    = lfsck->li_bottom;
1270         struct thandle                  *th     = NULL;
1271         struct lu_buf                    linkea_buf;
1272         int                              rc     = 0;
1273         ENTRY;
1274
1275         LASSERT(!dt_object_remote(obj));
1276
1277         th = dt_trans_create(env, dev);
1278         if (IS_ERR(th))
1279                 GOTO(log, rc = PTR_ERR(th));
1280
1281         lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
1282                        ldata->ld_leh->leh_len);
1283         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1284                                   XATTR_NAME_LINK, 0, th);
1285         if (rc != 0)
1286                 GOTO(stop, rc);
1287
1288         rc = dt_trans_start_local(env, dev, th);
1289         if (rc != 0)
1290                 GOTO(stop, rc);
1291
1292         dt_write_lock(env, obj, 0);
1293         if (unlikely(lfsck_is_dead_obj(obj)))
1294                 GOTO(unlock, rc = 0);
1295
1296         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1297                 GOTO(unlock, rc = 1);
1298
1299         rc = dt_xattr_set(env, obj, &linkea_buf,
1300                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1301
1302         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
1303
1304 unlock:
1305         dt_write_unlock(env, obj);
1306
1307 stop:
1308         dt_trans_stop(env, dev, th);
1309
1310 log:
1311         CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild linkEA for the "
1312                "object "DFID": rc = %d\n",
1313                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
1314
1315         if (rc != 0) {
1316                 struct lfsck_namespace *ns = com->lc_file_ram;
1317
1318                 ns->ln_flags |= LF_INCONSISTENT;
1319         }
1320
1321         return rc;
1322 }
1323
1324 /**
1325  * Update the ".." name entry for the given object.
1326  *
1327  * The object's ".." is corrupted, this function will update the ".." name
1328  * entry with the given pfid, and the linkEA with the given ldata.
1329  *
1330  * The caller should take the ldlm lock before the calling.
1331  *
1332  * \param[in] env       pointer to the thread context
1333  * \param[in] com       pointer to the lfsck component
1334  * \param[in] obj       pointer to the dt_object to be handled
1335  * \param[in] pfid      the new fid for the object's ".." name entry
1336  * \param[in] cname     the name for the @obj in the parent directory
1337  *
1338  * \retval              positive number for repaired cases
1339  * \retval              0 if nothing to be repaired
1340  * \retval              negative error number on failure
1341  */
1342 static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env,
1343                                                   struct lfsck_component *com,
1344                                                   struct dt_object *obj,
1345                                                   const struct lu_fid *pfid,
1346                                                   struct lu_name *cname)
1347 {
1348         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1349         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1350         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1351         struct dt_device                *dev    = lfsck->li_bottom;
1352         struct thandle                  *th     = NULL;
1353         struct linkea_data               ldata  = { 0 };
1354         struct lu_buf                    linkea_buf;
1355         int                              rc     = 0;
1356         ENTRY;
1357
1358         LASSERT(!dt_object_remote(obj));
1359         LASSERT(S_ISDIR(lfsck_object_type(obj)));
1360
1361         rc = linkea_data_new(&ldata, &info->lti_big_buf);
1362         if (rc != 0)
1363                 GOTO(log, rc);
1364
1365         rc = linkea_add_buf(&ldata, cname, pfid);
1366         if (rc != 0)
1367                 GOTO(log, rc);
1368
1369         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
1370                        ldata.ld_leh->leh_len);
1371
1372         th = dt_trans_create(env, dev);
1373         if (IS_ERR(th))
1374                 GOTO(log, rc = PTR_ERR(th));
1375
1376         rc = dt_declare_delete(env, obj, (const struct dt_key *)dotdot, th);
1377         if (rc != 0)
1378                 GOTO(stop, rc);
1379
1380         rec->rec_type = S_IFDIR;
1381         rec->rec_fid = pfid;
1382         rc = dt_declare_insert(env, obj, (const struct dt_rec *)rec,
1383                                (const struct dt_key *)dotdot, th);
1384         if (rc != 0)
1385                 GOTO(stop, rc);
1386
1387         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1388                                   XATTR_NAME_LINK, 0, th);
1389         if (rc != 0)
1390                 GOTO(stop, rc);
1391
1392         rc = dt_trans_start_local(env, dev, th);
1393         if (rc != 0)
1394                 GOTO(stop, rc);
1395
1396         dt_write_lock(env, obj, 0);
1397         if (unlikely(lfsck_is_dead_obj(obj)))
1398                 GOTO(unlock, rc = 0);
1399
1400         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1401                 GOTO(unlock, rc = 1);
1402
1403         /* The old ".." name entry maybe not exist. */
1404         dt_delete(env, obj, (const struct dt_key *)dotdot, th,
1405                   BYPASS_CAPA);
1406
1407         rc = dt_insert(env, obj, (const struct dt_rec *)rec,
1408                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
1409         if (rc != 0)
1410                 GOTO(unlock, rc);
1411
1412         rc = dt_xattr_set(env, obj, &linkea_buf,
1413                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1414
1415         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
1416
1417 unlock:
1418         dt_write_unlock(env, obj);
1419
1420 stop:
1421         dt_trans_stop(env, dev, th);
1422
1423 log:
1424         CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild dotdot name entry for "
1425                "the object "DFID", new parent "DFID": rc = %d\n",
1426                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)),
1427                PFID(pfid), rc);
1428
1429         if (rc != 0) {
1430                 struct lfsck_namespace *ns = com->lc_file_ram;
1431
1432                 ns->ln_flags |= LF_INCONSISTENT;
1433         }
1434
1435         return rc;
1436 }
1437
1438 /**
1439  * Handle orphan @obj during Double Scan Directory.
1440  *
1441  * Remove the @obj's current (invalid) linkEA entries, and insert
1442  * it in the directory .lustre/lost+found/MDTxxxx/ with the name:
1443  * ${FID}-${PFID}-D-${conflict_version}
1444  *
1445  * The caller should take the ldlm lock before the calling.
1446  *
1447  * \param[in] env       pointer to the thread context
1448  * \param[in] com       pointer to the lfsck component
1449  * \param[in] obj       pointer to the orphan object to be handled
1450  * \param[in] pfid      the new fid for the object's ".." name entry
1451  * \param[in,out] lh    ldlm lock handler for the given @obj
1452  * \param[out] type     to tell the caller what the inconsistency is
1453  *
1454  * \retval              positive number for repaired cases
1455  * \retval              0 if nothing to be repaired
1456  * \retval              negative error number on failure
1457  */
1458 static int
1459 lfsck_namespace_dsd_orphan(const struct lu_env *env,
1460                            struct lfsck_component *com,
1461                            struct dt_object *obj,
1462                            const struct lu_fid *pfid,
1463                            struct lustre_handle *lh,
1464                            enum lfsck_namespace_inconsistency_type *type)
1465 {
1466         struct lfsck_thread_info *info = lfsck_env_info(env);
1467         int                       rc;
1468         ENTRY;
1469
1470         /* Remove the unrecognized linkEA. */
1471         rc = lfsck_namespace_links_remove(env, com, obj);
1472         lfsck_ibits_unlock(lh, LCK_EX);
1473         if (rc < 0 && rc != -ENODATA)
1474                 RETURN(rc);
1475
1476         *type = LNIT_MUL_REF;
1477         /* The unique linkEA is invalid, even if the ".." name entry may be
1478          * valid, we still cannot know via which name entry this directory
1479          * will be referenced. Then handle it as pure orphan. */
1480         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
1481                  "-"DFID, PFID(pfid));
1482         rc = lfsck_namespace_insert_orphan(env, com, obj,
1483                                            info->lti_tmpbuf, "D", NULL);
1484
1485         RETURN(rc);
1486 }
1487
1488 /**
1489  * Double Scan Directory object for single linkEA entry case.
1490  *
1491  * The given @child has unique linkEA entry. If the linkEA entry is valid,
1492  * then check whether the name is in the namespace or not, if not, add the
1493  * missing name entry back to namespace. If the linkEA entry is invalid,
1494  * then remove it and insert the @child in the .lustre/lost+found/MDTxxxx/
1495  * as an orphan.
1496  *
1497  * \param[in] env       pointer to the thread context
1498  * \param[in] com       pointer to the lfsck component
1499  * \param[in] child     pointer to the directory to be double scanned
1500  * \param[in] pfid      the FID corresponding to the ".." entry
1501  * \param[in] ldata     pointer to the linkEA data for the given @child
1502  * \param[in,out] lh    ldlm lock handler for the given @child
1503  * \param[out] type     to tell the caller what the inconsistency is
1504  * \param[in] retry     if found inconsistency, but the caller does not hold
1505  *                      ldlm lock on the @child, then set @retry as true
1506  *
1507  * \retval              positive number for repaired cases
1508  * \retval              0 if nothing to be repaired
1509  * \retval              negative error number on failure
1510  */
1511 static int
1512 lfsck_namespace_dsd_single(const struct lu_env *env,
1513                            struct lfsck_component *com,
1514                            struct dt_object *child,
1515                            const struct lu_fid *pfid,
1516                            struct linkea_data *ldata,
1517                            struct lustre_handle *lh,
1518                            enum lfsck_namespace_inconsistency_type *type,
1519                            bool *retry)
1520 {
1521         struct lfsck_thread_info *info          = lfsck_env_info(env);
1522         struct lu_name           *cname         = &info->lti_name;
1523         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
1524         struct lu_fid            *tfid          = &info->lti_fid3;
1525         struct lfsck_instance    *lfsck         = com->lc_lfsck;
1526         struct dt_object         *parent        = NULL;
1527         int                       rc            = 0;
1528         ENTRY;
1529
1530         lfsck_namespace_unpack_linkea_entry(ldata, cname, tfid, info->lti_key);
1531         /* The unique linkEA entry with bad parent will be handled as orphan. */
1532         if (!fid_is_sane(tfid)) {
1533                 if (!lustre_handle_is_used(lh) && retry != NULL)
1534                         *retry = true;
1535                 else
1536                         rc = lfsck_namespace_dsd_orphan(env, com, child,
1537                                                         pfid, lh, type);
1538
1539                 GOTO(out, rc);
1540         }
1541
1542         parent = lfsck_object_find_bottom(env, lfsck, tfid);
1543         if (IS_ERR(parent))
1544                 GOTO(out, rc = PTR_ERR(parent));
1545
1546         /* We trust the unique linkEA entry in spite of whether it matches the
1547          * ".." name entry or not. Because even if the linkEA entry is wrong
1548          * and the ".." name entry is right, we still cannot know via which
1549          * name entry the child will be referenced, since all known entries
1550          * have been verified during the first-stage scanning. */
1551         if (!dt_object_exists(parent)) {
1552                 if (!lustre_handle_is_used(lh) && retry != NULL) {
1553                         *retry = true;
1554
1555                         GOTO(out, rc = 0);
1556                 }
1557
1558                 lfsck_ibits_unlock(lh, LCK_EX);
1559                 /* Create the lost parent as an orphan. */
1560                 rc = lfsck_namespace_create_orphan(env, com, parent);
1561                 if (rc >= 0)
1562                         /* Add the missing name entry to the parent. */
1563                         rc = lfsck_namespace_insert_normal(env, com, parent,
1564                                                         child, cname->ln_name);
1565
1566                 GOTO(out, rc);
1567         }
1568
1569         /* The unique linkEA entry with bad parent will be handled as orphan. */
1570         if (unlikely(!dt_try_as_dir(env, parent))) {
1571                 if (!lustre_handle_is_used(lh) && retry != NULL)
1572                         *retry = true;
1573                 else
1574                         rc = lfsck_namespace_dsd_orphan(env, com, child,
1575                                                         pfid, lh, type);
1576
1577                 GOTO(out, rc);
1578         }
1579
1580         rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
1581                        (const struct dt_key *)cname->ln_name, BYPASS_CAPA);
1582         if (rc == -ENOENT) {
1583                 if (!lustre_handle_is_used(lh) && retry != NULL) {
1584                         *retry = true;
1585
1586                         GOTO(out, rc = 0);
1587                 }
1588
1589                 lfsck_ibits_unlock(lh, LCK_EX);
1590                 /* Add the missing name entry back to the namespace. */
1591                 rc = lfsck_namespace_insert_normal(env, com, parent, child,
1592                                                    cname->ln_name);
1593
1594                 GOTO(out, rc);
1595         }
1596
1597         if (rc != 0)
1598                 GOTO(out, rc);
1599
1600         if (!lu_fid_eq(tfid, cfid)) {
1601                 if (!lustre_handle_is_used(lh) && retry != NULL) {
1602                         *retry = true;
1603
1604                         GOTO(out, rc = 0);
1605                 }
1606
1607                 lfsck_ibits_unlock(lh, LCK_EX);
1608                 /* The name entry references another MDT-object that
1609                  * may be created by the LFSCK for repairing dangling
1610                  * name entry. Try to replace it. */
1611                 rc = lfsck_namespace_replace_cond(env, com, parent, child,
1612                                                   tfid, cname);
1613                 if (rc == 0)
1614                         rc = lfsck_namespace_dsd_orphan(env, com, child,
1615                                                         pfid, lh, type);
1616
1617                 GOTO(out, rc);
1618         }
1619
1620         /* The ".." name entry is wrong, update it. */
1621         if (!lu_fid_eq(pfid, lfsck_dto2fid(parent))) {
1622                 if (!lustre_handle_is_used(lh) && retry != NULL) {
1623                         *retry = true;
1624
1625                         GOTO(out, rc = 0);
1626                 }
1627
1628                 *type = LNIT_UNMATCHED_PAIRS;
1629                 rc = lfsck_namespace_repair_unmatched_pairs(env, com, child,
1630                                                 lfsck_dto2fid(parent), cname);
1631         }
1632
1633         GOTO(out, rc);
1634
1635 out:
1636         if (parent != NULL && !IS_ERR(parent))
1637                 lfsck_object_put(env, parent);
1638
1639         return rc;
1640 }
1641
1642 /**
1643  * Double Scan Directory object for multiple linkEA entries case.
1644  *
1645  * The given @child has multiple linkEA entries. There is at most one linkEA
1646  * entry will be valid, all the others will be removed. Firstly, the function
1647  * will try to find out the linkEA entry for which the name entry exists under
1648  * the given parent (@pfid). If there is no linkEA entry that matches the given
1649  * ".." name entry, then tries to find out the first linkEA entry that both the
1650  * parent and the name entry exist to rebuild a new ".." name entry.
1651  *
1652  * \param[in] env       pointer to the thread context
1653  * \param[in] com       pointer to the lfsck component
1654  * \param[in] child     pointer to the directory to be double scanned
1655  * \param[in] pfid      the FID corresponding to the ".." entry
1656  * \param[in] ldata     pointer to the linkEA data for the given @child
1657  * \param[in,out] lh    ldlm lock handler for the given @child
1658  * \param[out] type     to tell the caller what the inconsistency is
1659  * \param[in] lpf       true if the ".." entry is under lost+found/MDTxxxx/
1660  *
1661  * \retval              positive number for repaired cases
1662  * \retval              0 if nothing to be repaired
1663  * \retval              negative error number on failure
1664  */
1665 static int
1666 lfsck_namespace_dsd_multiple(const struct lu_env *env,
1667                              struct lfsck_component *com,
1668                              struct dt_object *child,
1669                              const struct lu_fid *pfid,
1670                              struct linkea_data *ldata,
1671                              struct lustre_handle *lh,
1672                              enum lfsck_namespace_inconsistency_type *type,
1673                              bool lpf)
1674 {
1675         struct lfsck_thread_info *info          = lfsck_env_info(env);
1676         struct lu_name           *cname         = &info->lti_name;
1677         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
1678         struct lu_fid            *tfid          = &info->lti_fid3;
1679         struct lu_fid            *pfid2         = &info->lti_fid4;
1680         struct lfsck_instance    *lfsck         = com->lc_lfsck;
1681         struct dt_object         *parent        = NULL;
1682         struct linkea_data        ldata_new     = { 0 };
1683         int                       rc            = 0;
1684         bool                      once          = true;
1685         ENTRY;
1686
1687 again:
1688         while (ldata->ld_lee != NULL) {
1689                 lfsck_namespace_unpack_linkea_entry(ldata, cname, tfid,
1690                                                     info->lti_key);
1691                 /* Drop repeated linkEA entries. */
1692                 lfsck_namespace_filter_linkea_entry(ldata, cname, tfid, true);
1693                 /* Drop invalid linkEA entry. */
1694                 if (!fid_is_sane(tfid)) {
1695                         linkea_del_buf(ldata, cname);
1696                         continue;
1697                 }
1698
1699                 /* If current dotdot is the .lustre/lost+found/MDTxxxx/,
1700                  * then it is possible that: the directry object has ever
1701                  * been lost, but its name entry was there. In the former
1702                  * LFSCK run, during the first-stage scanning, the LFSCK
1703                  * found the dangling name entry, but it did not recreate
1704                  * the lost object, and when moved to the second-stage
1705                  * scanning, some children objects of the lost directory
1706                  * object were found, then the LFSCK recreated such lost
1707                  * directory object as an orphan.
1708                  *
1709                  * When the LFSCK runs again, if the dangling name is still
1710                  * there, the LFSCK should move the orphan directory object
1711                  * back to the normal namespace. */
1712                 if (!lpf && !lu_fid_eq(pfid, tfid) && once) {
1713                         linkea_next_entry(ldata);
1714                         continue;
1715                 }
1716
1717                 parent = lfsck_object_find_bottom(env, lfsck, tfid);
1718                 if (IS_ERR(parent))
1719                         RETURN(PTR_ERR(parent));
1720
1721                 if (!dt_object_exists(parent)) {
1722                         lfsck_object_put(env, parent);
1723                         if (ldata->ld_leh->leh_reccount > 1) {
1724                                 /* If it is NOT the last linkEA entry, then
1725                                  * there is still other chance to make the
1726                                  * child to be visible via other parent, then
1727                                  * remove this linkEA entry. */
1728                                 linkea_del_buf(ldata, cname);
1729                                 continue;
1730                         }
1731
1732                         break;
1733                 }
1734
1735                 /* The linkEA entry with bad parent will be removed. */
1736                 if (unlikely(!dt_try_as_dir(env, parent))) {
1737                         lfsck_object_put(env, parent);
1738                         linkea_del_buf(ldata, cname);
1739                         continue;
1740                 }
1741
1742                 rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
1743                                (const struct dt_key *)cname->ln_name,
1744                                BYPASS_CAPA);
1745                 *pfid2 = *lfsck_dto2fid(parent);
1746                 if (rc == -ENOENT) {
1747                         lfsck_object_put(env, parent);
1748                         linkea_next_entry(ldata);
1749                         continue;
1750                 }
1751
1752                 if (rc != 0) {
1753                         lfsck_object_put(env, parent);
1754
1755                         RETURN(rc);
1756                 }
1757
1758                 if (lu_fid_eq(tfid, cfid)) {
1759                         lfsck_object_put(env, parent);
1760                         if (!lu_fid_eq(pfid, pfid2)) {
1761                                 *type = LNIT_UNMATCHED_PAIRS;
1762                                 rc = lfsck_namespace_repair_unmatched_pairs(env,
1763                                                 com, child, pfid2, cname);
1764
1765                                 RETURN(rc);
1766                         }
1767
1768 rebuild:
1769                         /* It is the most common case that we find the
1770                          * name entry corresponding to the linkEA entry
1771                          * that matches the ".." name entry. */
1772                         rc = linkea_data_new(&ldata_new, &info->lti_big_buf);
1773                         if (rc != 0)
1774                                 RETURN(rc);
1775
1776                         rc = linkea_add_buf(&ldata_new, cname, pfid2);
1777                         if (rc != 0)
1778                                 RETURN(rc);
1779
1780                         rc = lfsck_namespace_rebuild_linkea(env, com, child,
1781                                                             &ldata_new);
1782
1783                         /* XXX: there will be other patch. */
1784
1785                         RETURN(rc);
1786                 }
1787
1788                 lfsck_ibits_unlock(lh, LCK_EX);
1789                 /* The name entry references another MDT-object that may be
1790                  * created by the LFSCK for repairing dangling name entry.
1791                  * Try to replace it. */
1792                 rc = lfsck_namespace_replace_cond(env, com, parent, child,
1793                                                   tfid, cname);
1794                 lfsck_object_put(env, parent);
1795                 if (rc < 0)
1796                         RETURN(rc);
1797
1798                 if (rc > 0)
1799                         goto rebuild;
1800
1801                 linkea_del_buf(ldata, cname);
1802         }
1803
1804         if (ldata->ld_leh->leh_reccount == 1) {
1805                 rc = lfsck_namespace_dsd_single(env, com, child, pfid, ldata,
1806                                                 lh, type, NULL);
1807
1808                 RETURN(rc);
1809         }
1810
1811         /* All linkEA entries are invalid and removed, then handle the @child
1812          * as an orphan.*/
1813         if (ldata->ld_leh->leh_reccount == 0) {
1814                 rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, lh,
1815                                                 type);
1816
1817                 RETURN(rc);
1818         }
1819
1820         linkea_first_entry(ldata);
1821         /* If the dangling name entry for the orphan directory object has
1822          * been remvoed, then just check whether the directory object is
1823          * still under the .lustre/lost+found/MDTxxxx/ or not. */
1824         if (lpf) {
1825                 lpf = false;
1826                 goto again;
1827         }
1828
1829         /* There is no linkEA entry that matches the ".." name entry. Find
1830          * the first linkEA entry that both parent and name entry exist to
1831          * rebuild a new ".." name entry. */
1832         if (once) {
1833                 once = false;
1834                 goto again;
1835         }
1836
1837         RETURN(rc);
1838 }
1839
1840 /**
1841  * Double scan the directory object for namespace LFSCK.
1842  *
1843  * This function will verify the <parent, child> pairs in the namespace tree:
1844  * the parent references the child via some name entry that should be in the
1845  * child's linkEA entry, the child should back references the parent via its
1846  * ".." name entry.
1847  *
1848  * The LFSCK will scan every linkEA entry in turn until find out the first
1849  * matched pairs. If found, then all other linkEA entries will be dropped.
1850  * If all the linkEA entries cannot match the ".." name entry, then there
1851  * are serveral possible cases:
1852  *
1853  * 1) If there is only one linkEA entry, then trust it as long as the PFID
1854  *    in the linkEA entry is valid.
1855  *
1856  * 2) If there are multiple linkEA entries, then try to find the linkEA
1857  *    that matches the ".." name entry. If found, then all other entries
1858  *    are invalid; otherwise, it is quite possible that the ".." name entry
1859  *    is corrupted. Under such case, the LFSCK will rebuild the ".." name
1860  *    entry according to the first valid linkEA entry (both the parent and
1861  *    the name entry should exist).
1862  *
1863  * 3) If the directory object has no (valid) linkEA entry, then the
1864  *    directory object will be handled as pure orphan and inserted
1865  *    in the .lustre/lost+found/MDTxxxx/ with the name:
1866  *    ${self_FID}-${PFID}-D-${conflict_version}
1867  *
1868  * \param[in] env       pointer to the thread context
1869  * \param[in] com       pointer to the lfsck component
1870  * \param[in] child     pointer to the directory object to be handled
1871  * \param[in] flags     to indicate the specical checking on the @child
1872  *
1873  * \retval              positive number for repaired cases
1874  * \retval              0 if nothing to be repaired
1875  * \retval              negative error number on failure
1876  */
1877 static int lfsck_namespace_double_scan_dir(const struct lu_env *env,
1878                                            struct lfsck_component *com,
1879                                            struct dt_object *child, __u8 flags)
1880 {
1881         struct lfsck_thread_info *info          = lfsck_env_info(env);
1882         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
1883         struct lu_fid            *pfid          = &info->lti_fid2;
1884         struct lfsck_namespace   *ns            = com->lc_file_ram;
1885         struct lfsck_instance    *lfsck         = com->lc_lfsck;
1886         struct lustre_handle      lh            = { 0 };
1887         struct linkea_data        ldata         = { 0 };
1888         bool                      unknown       = false;
1889         bool                      lpf           = false;
1890         bool                      retry         = false;
1891         enum lfsck_namespace_inconsistency_type type = LNIT_BAD_LINKEA;
1892         int                       rc            = 0;
1893         ENTRY;
1894
1895         LASSERT(!dt_object_remote(child));
1896
1897         if (!(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) {
1898                 CDEBUG(D_LFSCK, "%s: some MDT(s) maybe NOT take part in the"
1899                        "the namespace LFSCK, then the LFSCK cannot guarantee"
1900                        "all the name entries have been verified in first-stage"
1901                        "scanning. So have to skip orphan related handling for"
1902                        "the directory object "DFID" with remote name entry\n",
1903                        lfsck_lfsck2name(lfsck), PFID(cfid));
1904
1905                 RETURN(0);
1906         }
1907
1908         if (unlikely(!dt_try_as_dir(env, child)))
1909                 GOTO(out, rc = -ENOTDIR);
1910
1911         /* We only take ldlm lock on the @child when required. When the
1912          * logic comes here for the first time, it is always false. */
1913         if (0) {
1914
1915 lock:
1916                 rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1917                                       MDS_INODELOCK_UPDATE |
1918                                       MDS_INODELOCK_XATTR, LCK_EX);
1919                 if (rc != 0)
1920                         GOTO(out, rc);
1921         }
1922
1923         dt_read_lock(env, child, 0);
1924         if (unlikely(lfsck_is_dead_obj(child))) {
1925                 dt_read_unlock(env, child);
1926
1927                 GOTO(out, rc = 0);
1928         }
1929
1930         rc = dt_lookup(env, child, (struct dt_rec *)pfid,
1931                        (const struct dt_key *)dotdot, BYPASS_CAPA);
1932         if (rc != 0) {
1933                 if (rc != -ENOENT && rc != -ENODATA && rc != -EINVAL) {
1934                         dt_read_unlock(env, child);
1935
1936                         GOTO(out, rc);
1937                 }
1938
1939                 if (!lustre_handle_is_used(&lh)) {
1940                         dt_read_unlock(env, child);
1941                         goto lock;
1942                 }
1943
1944                 fid_zero(pfid);
1945         } else if (lfsck->li_lpf_obj != NULL &&
1946                    lu_fid_eq(pfid, lfsck_dto2fid(lfsck->li_lpf_obj))) {
1947                 lpf = true;
1948         }
1949
1950         rc = lfsck_links_read(env, child, &ldata);
1951         dt_read_unlock(env, child);
1952         if (rc != 0) {
1953                 if (rc != -ENODATA && rc != -EINVAL)
1954                         GOTO(out, rc);
1955
1956                 if (!lustre_handle_is_used(&lh))
1957                         goto lock;
1958
1959                 if (rc == -EINVAL && !fid_is_zero(pfid)) {
1960                         /* Remove the corrupted linkEA. */
1961                         rc = lfsck_namespace_links_remove(env, com, child);
1962                         if (rc == 0)
1963                                 /* Here, because of the crashed linkEA, we
1964                                  * cannot know whether there is some parent
1965                                  * that references the child directory via
1966                                  * some name entry or not. So keep it there,
1967                                  * when the LFSCK run next time, if there is
1968                                  * some parent that references this object,
1969                                  * then the LFSCK can rebuild the linkEA;
1970                                  * otherwise, this object will be handled
1971                                  * as orphan as above. */
1972                                 unknown = true;
1973                 } else {
1974                         /* 1. If we have neither ".." nor linkEA,
1975                          *    then it is an orphan.
1976                          *
1977                          * 2. If we only have the ".." name entry,
1978                          *    but no parent references this child
1979                          *    directory, then handle it as orphan. */
1980                         lfsck_ibits_unlock(&lh, LCK_EX);
1981                         type = LNIT_MUL_REF;
1982                         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
1983                                  "-"DFID, PFID(pfid));
1984                         rc = lfsck_namespace_insert_orphan(env, com, child,
1985                                                 info->lti_tmpbuf, "D", NULL);
1986                 }
1987
1988                 GOTO(out, rc);
1989         }
1990
1991         linkea_first_entry(&ldata);
1992         /* This is the most common case: the object has unique linkEA entry. */
1993         if (ldata.ld_leh->leh_reccount == 1) {
1994                 rc = lfsck_namespace_dsd_single(env, com, child, pfid, &ldata,
1995                                                 &lh, &type, &retry);
1996                 if (retry) {
1997                         LASSERT(!lustre_handle_is_used(&lh));
1998
1999                         retry = false;
2000                         goto lock;
2001                 }
2002
2003                 GOTO(out, rc);
2004         }
2005
2006         if (!lustre_handle_is_used(&lh))
2007                 goto lock;
2008
2009         if (unlikely(ldata.ld_leh->leh_reccount == 0)) {
2010                 rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, &lh,
2011                                                 &type);
2012
2013                 GOTO(out, rc);
2014         }
2015
2016         /* When we come here, the cases usually like that:
2017          * 1) The directory object has a corrupted linkEA entry. During the
2018          *    first-stage scanning, the LFSCK cannot know such corruption,
2019          *    then it appends the right linkEA entry according to the found
2020          *    name entry after the bad one.
2021          *
2022          * 2) The directory object has a right linkEA entry. During the
2023          *    first-stage scanning, the LFSCK finds some bad name entry,
2024          *    but the LFSCK cannot aware that at that time, then it adds
2025          *    the bad linkEA entry for further processing. */
2026         rc = lfsck_namespace_dsd_multiple(env, com, child, pfid, &ldata,
2027                                           &lh, &type, lpf);
2028
2029         GOTO(out, rc);
2030
2031 out:
2032         lfsck_ibits_unlock(&lh, LCK_EX);
2033         if (rc > 0) {
2034                 switch (type) {
2035                 case LNIT_BAD_LINKEA:
2036                         ns->ln_linkea_repaired++;
2037                         break;
2038                 case LNIT_UNMATCHED_PAIRS:
2039                         ns->ln_unmatched_pairs_repaired++;
2040                         break;
2041                 case LNIT_MUL_REF:
2042                         ns->ln_mul_ref_repaired++;
2043                         break;
2044                 default:
2045                         break;
2046                 }
2047         }
2048
2049         if (unknown)
2050                 ns->ln_unknown_inconsistency++;
2051
2052         return rc;
2053 }
2054
2055 /**
2056  * Double scan the MDT-object for namespace LFSCK.
2057  *
2058  * If the MDT-object contains invalid or repeated linkEA entries, then drop
2059  * those entries from the linkEA; if the linkEA becomes empty or the object
2060  * has no linkEA, then it is an orphan and will be added into the directory
2061  * .lustre/lost+found/MDTxxxx/; if the remote parent is lost, then recreate
2062  * the remote parent; if the name entry corresponding to some linkEA entry
2063  * is lost, then add the name entry back to the namespace.
2064  *
2065  * \param[in] env       pointer to the thread context
2066  * \param[in] com       pointer to the lfsck component
2067  * \param[in] child     pointer to the dt_object to be handled
2068  * \param[in] flags     some hints to indicate how the @child should be handled
2069  *
2070  * \retval              positive number for repaired cases
2071  * \retval              0 if nothing to be repaired
2072  * \retval              negative error number on failure
2073  */
2074 static int lfsck_namespace_double_scan_one(const struct lu_env *env,
2075                                            struct lfsck_component *com,
2076                                            struct dt_object *child, __u8 flags)
2077 {
2078         struct lfsck_thread_info *info     = lfsck_env_info(env);
2079         struct lu_attr           *la       = &info->lti_la;
2080         struct lu_name           *cname    = &info->lti_name;
2081         struct lu_fid            *pfid     = &info->lti_fid;
2082         struct lu_fid            *cfid     = &info->lti_fid2;
2083         struct lfsck_instance    *lfsck    = com->lc_lfsck;
2084         struct lfsck_namespace   *ns       = com->lc_file_ram;
2085         struct dt_object         *parent   = NULL;
2086         struct linkea_data        ldata    = { 0 };
2087         bool                      repaired = false;
2088         int                       count    = 0;
2089         int                       rc;
2090         ENTRY;
2091
2092         dt_read_lock(env, child, 0);
2093         if (unlikely(lfsck_is_dead_obj(child))) {
2094                 dt_read_unlock(env, child);
2095
2096                 RETURN(0);
2097         }
2098
2099         if (S_ISDIR(lfsck_object_type(child))) {
2100                 dt_read_unlock(env, child);
2101                 rc = lfsck_namespace_double_scan_dir(env, com, child, flags);
2102
2103                 RETURN(rc);
2104         }
2105
2106         rc = lfsck_links_read(env, child, &ldata);
2107         dt_read_unlock(env, child);
2108         if (rc != 0)
2109                 GOTO(out, rc);
2110
2111         linkea_first_entry(&ldata);
2112         while (ldata.ld_lee != NULL) {
2113                 lfsck_namespace_unpack_linkea_entry(&ldata, cname, pfid,
2114                                                     info->lti_key);
2115                 rc = lfsck_namespace_filter_linkea_entry(&ldata, cname, pfid,
2116                                                          false);
2117                 /* Found repeated linkEA entries */
2118                 if (rc > 0) {
2119                         rc = lfsck_namespace_shrink_linkea(env, com, child,
2120                                                 &ldata, cname, pfid, false);
2121                         if (rc < 0)
2122                                 GOTO(out, rc);
2123
2124                         if (rc == 0)
2125                                 continue;
2126
2127                         repaired = true;
2128
2129                         /* fall through */
2130                 }
2131
2132                 /* Invalid PFID in the linkEA entry. */
2133                 if (!fid_is_sane(pfid)) {
2134                         rc = lfsck_namespace_shrink_linkea(env, com, child,
2135                                                 &ldata, cname, pfid, true);
2136                         if (rc < 0)
2137                                 GOTO(out, rc);
2138
2139                         if (rc > 0)
2140                                 repaired = true;
2141
2142                         continue;
2143                 }
2144
2145                 parent = lfsck_object_find_bottom(env, lfsck, pfid);
2146                 if (IS_ERR(parent))
2147                         GOTO(out, rc = PTR_ERR(parent));
2148
2149                 if (!dt_object_exists(parent)) {
2150                         if (ldata.ld_leh->leh_reccount > 1) {
2151                                 /* If it is NOT the last linkEA entry, then
2152                                  * there is still other chance to make the
2153                                  * child to be visible via other parent, then
2154                                  * remove this linkEA entry. */
2155                                 rc = lfsck_namespace_shrink_linkea(env, com,
2156                                         child, &ldata, cname, pfid, true);
2157                         } else {
2158                                 /* Create the lost parent as an orphan. */
2159                                 rc = lfsck_namespace_create_orphan(env, com,
2160                                                                    parent);
2161                                 if (rc < 0) {
2162                                         lfsck_object_put(env, parent);
2163
2164                                         GOTO(out, rc);
2165                                 }
2166
2167                                 if (rc > 0)
2168                                         repaired = true;
2169
2170                                 /* Add the missing name entry to the parent. */
2171                                 rc = lfsck_namespace_insert_normal(env, com,
2172                                                 parent, child, cname->ln_name);
2173                                 linkea_next_entry(&ldata);
2174                         }
2175
2176                         lfsck_object_put(env, parent);
2177                         if (rc < 0)
2178                                 GOTO(out, rc);
2179
2180                         if (rc > 0)
2181                                 repaired = true;
2182
2183                         continue;
2184                 }
2185
2186                 /* The linkEA entry with bad parent will be removed. */
2187                 if (unlikely(!dt_try_as_dir(env, parent))) {
2188                         lfsck_object_put(env, parent);
2189                         rc = lfsck_namespace_shrink_linkea(env, com, child,
2190                                                 &ldata, cname, pfid, true);
2191                         if (rc < 0)
2192                                 GOTO(out, rc);
2193
2194                         if (rc > 0)
2195                                 repaired = true;
2196
2197                         continue;
2198                 }
2199
2200                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
2201                                (const struct dt_key *)cname->ln_name,
2202                                BYPASS_CAPA);
2203                 if (rc != 0 && rc != -ENOENT) {
2204                         lfsck_object_put(env, parent);
2205
2206                         GOTO(out, rc);
2207                 }
2208
2209                 if (rc == 0) {
2210                         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
2211                                 /* It is the most common case that we
2212                                  * find the name entry corresponding
2213                                  * to the linkEA entry. */
2214                                 lfsck_object_put(env, parent);
2215                                 linkea_next_entry(&ldata);
2216                         } else {
2217                                 /* The name entry references another
2218                                  * MDT-object that may be created by
2219                                  * the LFSCK for repairing dangling
2220                                  * name entry. Try to replace it. */
2221                                 rc = lfsck_namespace_replace_cond(env, com,
2222                                                 parent, child, cfid, cname);
2223                                 lfsck_object_put(env, parent);
2224                                 if (rc < 0)
2225                                         GOTO(out, rc);
2226
2227                                 if (rc > 0) {
2228                                         repaired = true;
2229                                         linkea_next_entry(&ldata);
2230                                 } else {
2231                                         rc = lfsck_namespace_shrink_linkea(env,
2232                                                         com, child, &ldata,
2233                                                         cname, pfid, true);
2234                                         if (rc < 0)
2235                                                 GOTO(out, rc);
2236
2237                                         if (rc > 0)
2238                                                 repaired = true;
2239                                 }
2240                         }
2241
2242                         continue;
2243                 }
2244
2245                 rc = dt_attr_get(env, child, la, BYPASS_CAPA);
2246                 if (rc != 0)
2247                         GOTO(out, rc);
2248
2249                 /* If there is no name entry in the parent dir and the object
2250                  * link count is less than the linkea entries count, then the
2251                  * linkea entry should be removed. */
2252                 if (ldata.ld_leh->leh_reccount > la->la_nlink) {
2253                         rc = lfsck_namespace_shrink_linkea_cond(env, com,
2254                                         parent, child, &ldata, cname, pfid);
2255                         lfsck_object_put(env, parent);
2256                         if (rc < 0)
2257                                 GOTO(out, rc);
2258
2259                         if (rc > 0)
2260                                 repaired = true;
2261
2262                         continue;
2263                 }
2264
2265                 /* Add the missing name entry back to the namespace. */
2266                 rc = lfsck_namespace_insert_normal(env, com, parent, child,
2267                                                    cname->ln_name);
2268                 lfsck_object_put(env, parent);
2269                 if (rc < 0)
2270                         GOTO(out, rc);
2271
2272                 if (rc > 0)
2273                         repaired = true;
2274
2275                 linkea_next_entry(&ldata);
2276         }
2277
2278         GOTO(out, rc = 0);
2279
2280 out:
2281         if (rc < 0 && rc != -ENODATA)
2282                 return rc;
2283
2284         if (rc == 0) {
2285                 LASSERT(ldata.ld_leh != NULL);
2286
2287                 count = ldata.ld_leh->leh_reccount;
2288         }
2289
2290         if (count == 0) {
2291                 /* If the child becomes orphan, then insert it into
2292                  * the global .lustre/lost+found/MDTxxxx directory. */
2293                 rc = lfsck_namespace_insert_orphan(env, com, child, "", "O",
2294                                                    &count);
2295                 if (rc < 0)
2296                         return rc;
2297
2298                 if (rc > 0) {
2299                         ns->ln_mul_ref_repaired++;
2300                         repaired = true;
2301                 }
2302         }
2303
2304         rc = dt_attr_get(env, child, la, BYPASS_CAPA);
2305         if (rc != 0)
2306                 return rc;
2307
2308         if (la->la_nlink != count) {
2309                 /* XXX: there will be other patch(es) for MDT-object
2310                  *      hard links verification. */
2311         }
2312
2313         if (repaired) {
2314                 if (la->la_nlink > 1)
2315                         ns->ln_mul_linked_repaired++;
2316
2317                 if (rc == 0)
2318                         rc = 1;
2319         }
2320
2321         return rc;
2322 }
2323
2324 static void lfsck_namespace_dump_statistics(struct seq_file *m,
2325                                             struct lfsck_namespace *ns,
2326                                             __u64 checked_phase1,
2327                                             __u64 checked_phase2,
2328                                             __u32 time_phase1,
2329                                             __u32 time_phase2)
2330 {
2331         seq_printf(m, "checked_phase1: "LPU64"\n"
2332                       "checked_phase2: "LPU64"\n"
2333                       "updated_phase1: "LPU64"\n"
2334                       "updated_phase2: "LPU64"\n"
2335                       "failed_phase1: "LPU64"\n"
2336                       "failed_phase2: "LPU64"\n"
2337                       "directories: "LPU64"\n"
2338                       "dirent_repaired: "LPU64"\n"
2339                       "linkea_repaired: "LPU64"\n"
2340                       "nlinks_repaired: "LPU64"\n"
2341                       "lost_found: "LPU64"\n"
2342                       "multiple_linked_checked: "LPU64"\n"
2343                       "multiple_linked_repaired: "LPU64"\n"
2344                       "unknown_inconsistency: "LPU64"\n"
2345                       "unmatched_pairs_repaired: "LPU64"\n"
2346                       "dangling_repaired: "LPU64"\n"
2347                       "multiple_referenced_repaired: "LPU64"\n"
2348                       "success_count: %u\n"
2349                       "run_time_phase1: %u seconds\n"
2350                       "run_time_phase2: %u seconds\n",
2351                       checked_phase1,
2352                       checked_phase2,
2353                       ns->ln_items_repaired,
2354                       ns->ln_objs_repaired_phase2,
2355                       ns->ln_items_failed,
2356                       ns->ln_objs_failed_phase2,
2357                       ns->ln_dirs_checked,
2358                       ns->ln_dirent_repaired,
2359                       ns->ln_linkea_repaired,
2360                       ns->ln_objs_nlink_repaired,
2361                       ns->ln_objs_lost_found,
2362                       ns->ln_mul_linked_checked,
2363                       ns->ln_mul_linked_repaired,
2364                       ns->ln_unknown_inconsistency,
2365                       ns->ln_unmatched_pairs_repaired,
2366                       ns->ln_dangling_repaired,
2367                       ns->ln_mul_ref_repaired,
2368                       ns->ln_success_count,
2369                       time_phase1,
2370                       time_phase2);
2371 }
2372
2373 /* namespace APIs */
2374
2375 static int lfsck_namespace_reset(const struct lu_env *env,
2376                                  struct lfsck_component *com, bool init)
2377 {
2378         struct lfsck_instance   *lfsck = com->lc_lfsck;
2379         struct lfsck_namespace  *ns    = com->lc_file_ram;
2380         struct dt_object        *root;
2381         struct dt_object        *dto;
2382         int                      rc;
2383         ENTRY;
2384
2385         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
2386         if (IS_ERR(root))
2387                 GOTO(log, rc = PTR_ERR(root));
2388
2389         if (unlikely(!dt_try_as_dir(env, root)))
2390                 GOTO(put, rc = -ENOTDIR);
2391
2392         down_write(&com->lc_sem);
2393         if (init) {
2394                 memset(ns, 0, sizeof(*ns));
2395         } else {
2396                 __u32 count = ns->ln_success_count;
2397                 __u64 last_time = ns->ln_time_last_complete;
2398
2399                 memset(ns, 0, sizeof(*ns));
2400                 ns->ln_success_count = count;
2401                 ns->ln_time_last_complete = last_time;
2402         }
2403         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
2404         ns->ln_status = LS_INIT;
2405
2406         rc = local_object_unlink(env, lfsck->li_bottom, root,
2407                                  lfsck_namespace_name);
2408         if (rc != 0)
2409                 GOTO(out, rc);
2410
2411         lfsck_object_put(env, com->lc_obj);
2412         com->lc_obj = NULL;
2413         dto = local_index_find_or_create(env, lfsck->li_los, root,
2414                                          lfsck_namespace_name,
2415                                          S_IFREG | S_IRUGO | S_IWUSR,
2416                                          &dt_lfsck_features);
2417         if (IS_ERR(dto))
2418                 GOTO(out, rc = PTR_ERR(dto));
2419
2420         com->lc_obj = dto;
2421         rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
2422         if (rc != 0)
2423                 GOTO(out, rc);
2424
2425         rc = lfsck_namespace_store(env, com, true);
2426
2427         GOTO(out, rc);
2428
2429 out:
2430         up_write(&com->lc_sem);
2431
2432 put:
2433         lu_object_put(env, &root->do_lu);
2434 log:
2435         CDEBUG(D_LFSCK, "%s: namespace LFSCK reset: rc = %d\n",
2436                lfsck_lfsck2name(lfsck), rc);
2437         return rc;
2438 }
2439
2440 static void
2441 lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
2442                      bool new_checked)
2443 {
2444         struct lfsck_namespace *ns = com->lc_file_ram;
2445
2446         down_write(&com->lc_sem);
2447         if (new_checked)
2448                 com->lc_new_checked++;
2449         lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
2450         up_write(&com->lc_sem);
2451 }
2452
2453 static int lfsck_namespace_checkpoint(const struct lu_env *env,
2454                                       struct lfsck_component *com, bool init)
2455 {
2456         struct lfsck_instance   *lfsck = com->lc_lfsck;
2457         struct lfsck_namespace  *ns    = com->lc_file_ram;
2458         int                      rc;
2459
2460         if (!init) {
2461                 rc = lfsck_checkpoint_generic(env, com);
2462                 if (rc != 0)
2463                         goto log;
2464         }
2465
2466         down_write(&com->lc_sem);
2467         if (init) {
2468                 ns->ln_pos_latest_start = lfsck->li_pos_checkpoint;
2469         } else {
2470                 ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
2471                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
2472                                 HALF_SEC - lfsck->li_time_last_checkpoint);
2473                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
2474                 ns->ln_items_checked += com->lc_new_checked;
2475                 com->lc_new_checked = 0;
2476         }
2477
2478         rc = lfsck_namespace_store(env, com, false);
2479         up_write(&com->lc_sem);
2480
2481 log:
2482         CDEBUG(D_LFSCK, "%s: namespace LFSCK checkpoint at the pos ["LPU64
2483                ", "DFID", "LPX64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
2484                lfsck->li_pos_current.lp_oit_cookie,
2485                PFID(&lfsck->li_pos_current.lp_dir_parent),
2486                lfsck->li_pos_current.lp_dir_cookie, rc);
2487
2488         return rc > 0 ? 0 : rc;
2489 }
2490
2491 static int lfsck_namespace_prep(const struct lu_env *env,
2492                                 struct lfsck_component *com,
2493                                 struct lfsck_start_param *lsp)
2494 {
2495         struct lfsck_instance   *lfsck  = com->lc_lfsck;
2496         struct lfsck_namespace  *ns     = com->lc_file_ram;
2497         struct lfsck_position   *pos    = &com->lc_pos_start;
2498         int                      rc;
2499
2500         if (ns->ln_status == LS_COMPLETED) {
2501                 rc = lfsck_namespace_reset(env, com, false);
2502                 if (rc == 0)
2503                         rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
2504
2505                 if (rc != 0) {
2506                         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: "
2507                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
2508
2509                         return rc;
2510                 }
2511         }
2512
2513         down_write(&com->lc_sem);
2514         ns->ln_time_latest_start = cfs_time_current_sec();
2515         spin_lock(&lfsck->li_lock);
2516
2517         if (ns->ln_flags & LF_SCANNED_ONCE) {
2518                 if (!lfsck->li_drop_dryrun ||
2519                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
2520                         ns->ln_status = LS_SCANNING_PHASE2;
2521                         list_move_tail(&com->lc_link,
2522                                        &lfsck->li_list_double_scan);
2523                         if (!list_empty(&com->lc_link_dir))
2524                                 list_del_init(&com->lc_link_dir);
2525                         lfsck_pos_set_zero(pos);
2526                 } else {
2527                         ns->ln_status = LS_SCANNING_PHASE1;
2528                         ns->ln_run_time_phase1 = 0;
2529                         ns->ln_run_time_phase2 = 0;
2530                         ns->ln_items_checked = 0;
2531                         ns->ln_items_repaired = 0;
2532                         ns->ln_items_failed = 0;
2533                         ns->ln_dirs_checked = 0;
2534                         ns->ln_objs_checked_phase2 = 0;
2535                         ns->ln_objs_repaired_phase2 = 0;
2536                         ns->ln_objs_failed_phase2 = 0;
2537                         ns->ln_objs_nlink_repaired = 0;
2538                         ns->ln_objs_lost_found = 0;
2539                         ns->ln_dirent_repaired = 0;
2540                         ns->ln_linkea_repaired = 0;
2541                         ns->ln_mul_linked_checked = 0;
2542                         ns->ln_mul_linked_repaired = 0;
2543                         ns->ln_unknown_inconsistency = 0;
2544                         ns->ln_unmatched_pairs_repaired = 0;
2545                         ns->ln_dangling_repaired = 0;
2546                         ns->ln_mul_ref_repaired = 0;
2547                         fid_zero(&ns->ln_fid_latest_scanned_phase2);
2548                         if (list_empty(&com->lc_link_dir))
2549                                 list_add_tail(&com->lc_link_dir,
2550                                               &lfsck->li_list_dir);
2551                         *pos = ns->ln_pos_first_inconsistent;
2552                 }
2553         } else {
2554                 ns->ln_status = LS_SCANNING_PHASE1;
2555                 if (list_empty(&com->lc_link_dir))
2556                         list_add_tail(&com->lc_link_dir,
2557                                       &lfsck->li_list_dir);
2558                 if (!lfsck->li_drop_dryrun ||
2559                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
2560                         *pos = ns->ln_pos_last_checkpoint;
2561                         pos->lp_oit_cookie++;
2562                 } else {
2563                         *pos = ns->ln_pos_first_inconsistent;
2564                 }
2565         }
2566
2567         spin_unlock(&lfsck->li_lock);
2568         up_write(&com->lc_sem);
2569
2570         rc = lfsck_start_assistant(env, com, lsp);
2571
2572         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep done, start pos ["LPU64", "
2573                DFID", "LPX64"]: rc = %d\n",
2574                lfsck_lfsck2name(lfsck), pos->lp_oit_cookie,
2575                PFID(&pos->lp_dir_parent), pos->lp_dir_cookie, rc);
2576
2577         return rc;
2578 }
2579
2580 static int lfsck_namespace_exec_oit(const struct lu_env *env,
2581                                     struct lfsck_component *com,
2582                                     struct dt_object *obj)
2583 {
2584         struct lfsck_thread_info *info  = lfsck_env_info(env);
2585         struct lfsck_namespace   *ns    = com->lc_file_ram;
2586         struct lfsck_instance    *lfsck = com->lc_lfsck;
2587         const struct lu_fid      *fid   = lfsck_dto2fid(obj);
2588         struct lu_attr           *la    = &info->lti_la;
2589         struct lu_fid            *pfid  = &info->lti_fid2;
2590         struct lu_name           *cname = &info->lti_name;
2591         struct lu_seq_range      *range = &info->lti_range;
2592         struct dt_device         *dev   = lfsck->li_bottom;
2593         struct seq_server_site   *ss    =
2594                                 lu_site2seq(dev->dd_lu_dev.ld_site);
2595         struct linkea_data        ldata = { 0 };
2596         __u32                     idx   = lfsck_dev_idx(dev);
2597         int                       rc;
2598         ENTRY;
2599
2600         rc = lfsck_links_read(env, obj, &ldata);
2601         if (rc == -ENOENT)
2602                 GOTO(out, rc = 0);
2603
2604         /* -EINVAL means crashed linkEA, should be verified. */
2605         if (rc == -EINVAL) {
2606                 rc = lfsck_namespace_trace_update(env, com, fid,
2607                                                   LNTF_CHECK_LINKEA, true);
2608                 if (rc == 0) {
2609                         struct lustre_handle lh = { 0 };
2610
2611                         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
2612                                               MDS_INODELOCK_UPDATE |
2613                                               MDS_INODELOCK_XATTR, LCK_EX);
2614                         if (rc == 0) {
2615                                 rc = lfsck_namespace_links_remove(env, com,
2616                                                                   obj);
2617                                 lfsck_ibits_unlock(&lh, LCK_EX);
2618                         }
2619                 }
2620
2621                 GOTO(out, rc = (rc == -ENOENT ? 0 : rc));
2622         }
2623
2624         /* zero-linkEA object may be orphan, but it also maybe because
2625          * of upgrading. Currently, we cannot record it for double scan.
2626          * Because it may cause the LFSCK tracing file to be too large. */
2627         if (rc == -ENODATA) {
2628                 if (S_ISDIR(lfsck_object_type(obj)))
2629                         GOTO(out, rc = 0);
2630
2631                 rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2632                 if (rc != 0)
2633                         GOTO(out, rc);
2634
2635                 if (la->la_nlink > 1)
2636                         rc = lfsck_namespace_trace_update(env, com, fid,
2637                                                 LNTF_CHECK_LINKEA, true);
2638
2639                 GOTO(out, rc);
2640         }
2641
2642         if (rc != 0)
2643                 GOTO(out, rc);
2644
2645         /* Record multiple-linked object. */
2646         if (ldata.ld_leh->leh_reccount > 1) {
2647                 rc = lfsck_namespace_trace_update(env, com, fid,
2648                                                   LNTF_CHECK_LINKEA, true);
2649
2650                 GOTO(out, rc);
2651         }
2652
2653         linkea_first_entry(&ldata);
2654         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
2655         if (!fid_is_sane(pfid)) {
2656                 rc = lfsck_namespace_trace_update(env, com, fid,
2657                                                   LNTF_CHECK_PARENT, true);
2658         } else {
2659                 fld_range_set_mdt(range);
2660                 rc = fld_local_lookup(env, ss->ss_server_fld,
2661                                       fid_seq(pfid), range);
2662                 if ((rc == -ENOENT) ||
2663                     (rc == 0 && range->lsr_index != idx)) {
2664                         rc = lfsck_namespace_trace_update(env, com, fid,
2665                                                 LNTF_CHECK_LINKEA, true);
2666                 } else {
2667                         if (S_ISDIR(lfsck_object_type(obj)))
2668                                 GOTO(out, rc = 0);
2669
2670                         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2671                         if (rc != 0)
2672                                 GOTO(out, rc);
2673
2674                         if (la->la_nlink > 1)
2675                                 rc = lfsck_namespace_trace_update(env, com,
2676                                                 fid, LNTF_CHECK_LINKEA, true);
2677                 }
2678         }
2679
2680         GOTO(out, rc);
2681
2682 out:
2683         down_write(&com->lc_sem);
2684         com->lc_new_checked++;
2685         if (S_ISDIR(lfsck_object_type(obj)))
2686                 ns->ln_dirs_checked++;
2687         if (rc != 0)
2688                 lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
2689         up_write(&com->lc_sem);
2690
2691         return rc;
2692 }
2693
2694 static int lfsck_namespace_exec_dir(const struct lu_env *env,
2695                                     struct lfsck_component *com,
2696                                     struct lu_dirent *ent, __u16 type)
2697 {
2698         struct lfsck_assistant_data     *lad    = com->lc_data;
2699         struct lfsck_namespace_req      *lnr;
2700         bool                             wakeup = false;
2701
2702         lnr = lfsck_namespace_assistant_req_init(com->lc_lfsck, ent, type);
2703         if (IS_ERR(lnr)) {
2704                 struct lfsck_namespace *ns = com->lc_file_ram;
2705
2706                 lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
2707                 return PTR_ERR(lnr);
2708         }
2709
2710         spin_lock(&lad->lad_lock);
2711         if (lad->lad_assistant_status < 0) {
2712                 spin_unlock(&lad->lad_lock);
2713                 lfsck_namespace_assistant_req_fini(env, &lnr->lnr_lar);
2714                 return lad->lad_assistant_status;
2715         }
2716
2717         list_add_tail(&lnr->lnr_lar.lar_list, &lad->lad_req_list);
2718         if (lad->lad_prefetched == 0)
2719                 wakeup = true;
2720
2721         lad->lad_prefetched++;
2722         spin_unlock(&lad->lad_lock);
2723         if (wakeup)
2724                 wake_up_all(&lad->lad_thread.t_ctl_waitq);
2725
2726         down_write(&com->lc_sem);
2727         com->lc_new_checked++;
2728         up_write(&com->lc_sem);
2729
2730         return 0;
2731 }
2732
2733 static int lfsck_namespace_post(const struct lu_env *env,
2734                                 struct lfsck_component *com,
2735                                 int result, bool init)
2736 {
2737         struct lfsck_instance   *lfsck = com->lc_lfsck;
2738         struct lfsck_namespace  *ns    = com->lc_file_ram;
2739         int                      rc;
2740         ENTRY;
2741
2742         lfsck_post_generic(env, com, &result);
2743
2744         down_write(&com->lc_sem);
2745         spin_lock(&lfsck->li_lock);
2746         if (!init)
2747                 ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
2748         if (result > 0) {
2749                 ns->ln_status = LS_SCANNING_PHASE2;
2750                 ns->ln_flags |= LF_SCANNED_ONCE;
2751                 ns->ln_flags &= ~LF_UPGRADE;
2752                 list_del_init(&com->lc_link_dir);
2753                 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
2754         } else if (result == 0) {
2755                 ns->ln_status = lfsck->li_status;
2756                 if (ns->ln_status == 0)
2757                         ns->ln_status = LS_STOPPED;
2758                 if (ns->ln_status != LS_PAUSED) {
2759                         list_del_init(&com->lc_link_dir);
2760                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
2761                 }
2762         } else {
2763                 ns->ln_status = LS_FAILED;
2764                 list_del_init(&com->lc_link_dir);
2765                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
2766         }
2767         spin_unlock(&lfsck->li_lock);
2768
2769         if (!init) {
2770                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
2771                                 HALF_SEC - lfsck->li_time_last_checkpoint);
2772                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
2773                 ns->ln_items_checked += com->lc_new_checked;
2774                 com->lc_new_checked = 0;
2775         }
2776
2777         rc = lfsck_namespace_store(env, com, false);
2778         up_write(&com->lc_sem);
2779
2780         CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n",
2781                lfsck_lfsck2name(lfsck), rc);
2782
2783         RETURN(rc);
2784 }
2785
2786 static int
2787 lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
2788                      struct seq_file *m)
2789 {
2790         struct lfsck_instance   *lfsck = com->lc_lfsck;
2791         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
2792         struct lfsck_namespace  *ns    = com->lc_file_ram;
2793         int                      rc;
2794
2795         down_read(&com->lc_sem);
2796         seq_printf(m, "name: lfsck_namespace\n"
2797                    "magic: %#x\n"
2798                    "version: %d\n"
2799                    "status: %s\n",
2800                    ns->ln_magic,
2801                    bk->lb_version,
2802                    lfsck_status2names(ns->ln_status));
2803
2804         rc = lfsck_bits_dump(m, ns->ln_flags, lfsck_flags_names, "flags");
2805         if (rc < 0)
2806                 goto out;
2807
2808         rc = lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param");
2809         if (rc < 0)
2810                 goto out;
2811
2812         rc = lfsck_time_dump(m, ns->ln_time_last_complete,
2813                              "time_since_last_completed");
2814         if (rc < 0)
2815                 goto out;
2816
2817         rc = lfsck_time_dump(m, ns->ln_time_latest_start,
2818                              "time_since_latest_start");
2819         if (rc < 0)
2820                 goto out;
2821
2822         rc = lfsck_time_dump(m, ns->ln_time_last_checkpoint,
2823                              "time_since_last_checkpoint");
2824         if (rc < 0)
2825                 goto out;
2826
2827         rc = lfsck_pos_dump(m, &ns->ln_pos_latest_start,
2828                             "latest_start_position");
2829         if (rc < 0)
2830                 goto out;
2831
2832         rc = lfsck_pos_dump(m, &ns->ln_pos_last_checkpoint,
2833                             "last_checkpoint_position");
2834         if (rc < 0)
2835                 goto out;
2836
2837         rc = lfsck_pos_dump(m, &ns->ln_pos_first_inconsistent,
2838                             "first_failure_position");
2839         if (rc < 0)
2840                 goto out;
2841
2842         if (ns->ln_status == LS_SCANNING_PHASE1) {
2843                 struct lfsck_position pos;
2844                 const struct dt_it_ops *iops;
2845                 cfs_duration_t duration = cfs_time_current() -
2846                                           lfsck->li_time_last_checkpoint;
2847                 __u64 checked = ns->ln_items_checked + com->lc_new_checked;
2848                 __u64 speed = checked;
2849                 __u64 new_checked = com->lc_new_checked * HZ;
2850                 __u32 rtime = ns->ln_run_time_phase1 +
2851                               cfs_duration_sec(duration + HALF_SEC);
2852
2853                 if (duration != 0)
2854                         do_div(new_checked, duration);
2855                 if (rtime != 0)
2856                         do_div(speed, rtime);
2857                 lfsck_namespace_dump_statistics(m, ns, checked,
2858                                                 ns->ln_objs_checked_phase2,
2859                                                 rtime, ns->ln_run_time_phase2);
2860
2861                 seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
2862                               "average_speed_phase2: N/A\n"
2863                               "real_time_speed_phase1: "LPU64" items/sec\n"
2864                               "real_time_speed_phase2: N/A\n",
2865                               speed,
2866                               new_checked);
2867
2868                 LASSERT(lfsck->li_di_oit != NULL);
2869
2870                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
2871
2872                 /* The low layer otable-based iteration position may NOT
2873                  * exactly match the namespace-based directory traversal
2874                  * cookie. Generally, it is not a serious issue. But the
2875                  * caller should NOT make assumption on that. */
2876                 pos.lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
2877                 if (!lfsck->li_current_oit_processed)
2878                         pos.lp_oit_cookie--;
2879
2880                 spin_lock(&lfsck->li_lock);
2881                 if (lfsck->li_di_dir != NULL) {
2882                         pos.lp_dir_cookie = lfsck->li_cookie_dir;
2883                         if (pos.lp_dir_cookie >= MDS_DIR_END_OFF) {
2884                                 fid_zero(&pos.lp_dir_parent);
2885                                 pos.lp_dir_cookie = 0;
2886                         } else {
2887                                 pos.lp_dir_parent =
2888                                         *lfsck_dto2fid(lfsck->li_obj_dir);
2889                         }
2890                 } else {
2891                         fid_zero(&pos.lp_dir_parent);
2892                         pos.lp_dir_cookie = 0;
2893                 }
2894                 spin_unlock(&lfsck->li_lock);
2895                 lfsck_pos_dump(m, &pos, "current_position");
2896         } else if (ns->ln_status == LS_SCANNING_PHASE2) {
2897                 cfs_duration_t duration = cfs_time_current() -
2898                                           lfsck->li_time_last_checkpoint;
2899                 __u64 checked = ns->ln_objs_checked_phase2 +
2900                                 com->lc_new_checked;
2901                 __u64 speed1 = ns->ln_items_checked;
2902                 __u64 speed2 = checked;
2903                 __u64 new_checked = com->lc_new_checked * HZ;
2904                 __u32 rtime = ns->ln_run_time_phase2 +
2905                               cfs_duration_sec(duration + HALF_SEC);
2906
2907                 if (duration != 0)
2908                         do_div(new_checked, duration);
2909                 if (ns->ln_run_time_phase1 != 0)
2910                         do_div(speed1, ns->ln_run_time_phase1);
2911                 if (rtime != 0)
2912                         do_div(speed2, rtime);
2913                 lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked,
2914                                                 checked,
2915                                                 ns->ln_run_time_phase1, rtime);
2916
2917                 seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
2918                               "average_speed_phase2: "LPU64" objs/sec\n"
2919                               "real_time_speed_phase1: N/A\n"
2920                               "real_time_speed_phase2: "LPU64" objs/sec\n"
2921                               "current_position: "DFID"\n",
2922                               speed1,
2923                               speed2,
2924                               new_checked,
2925                               PFID(&ns->ln_fid_latest_scanned_phase2));
2926         } else {
2927                 __u64 speed1 = ns->ln_items_checked;
2928                 __u64 speed2 = ns->ln_objs_checked_phase2;
2929
2930                 if (ns->ln_run_time_phase1 != 0)
2931                         do_div(speed1, ns->ln_run_time_phase1);
2932                 if (ns->ln_run_time_phase2 != 0)
2933                         do_div(speed2, ns->ln_run_time_phase2);
2934                 lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked,
2935                                                 ns->ln_objs_checked_phase2,
2936                                                 ns->ln_run_time_phase1,
2937                                                 ns->ln_run_time_phase2);
2938
2939                 seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
2940                               "average_speed_phase2: "LPU64" objs/sec\n"
2941                               "real_time_speed_phase1: N/A\n"
2942                               "real_time_speed_phase2: N/A\n"
2943                               "current_position: N/A\n",
2944                               speed1,
2945                               speed2);
2946         }
2947 out:
2948         up_read(&com->lc_sem);
2949         return 0;
2950 }
2951
2952 static int lfsck_namespace_double_scan(const struct lu_env *env,
2953                                        struct lfsck_component *com)
2954 {
2955         struct lfsck_namespace *ns = com->lc_file_ram;
2956
2957         return lfsck_double_scan_generic(env, com, ns->ln_status);
2958 }
2959
2960 static void lfsck_namespace_data_release(const struct lu_env *env,
2961                                          struct lfsck_component *com)
2962 {
2963         struct lfsck_assistant_data     *lad    = com->lc_data;
2964         struct lfsck_tgt_descs          *ltds   = &com->lc_lfsck->li_mdt_descs;
2965         struct lfsck_tgt_desc           *ltd;
2966         struct lfsck_tgt_desc           *next;
2967
2968         LASSERT(lad != NULL);
2969         LASSERT(thread_is_init(&lad->lad_thread) ||
2970                 thread_is_stopped(&lad->lad_thread));
2971         LASSERT(list_empty(&lad->lad_req_list));
2972
2973         com->lc_data = NULL;
2974
2975         spin_lock(&ltds->ltd_lock);
2976         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
2977                                  ltd_namespace_phase_list) {
2978                 list_del_init(&ltd->ltd_namespace_phase_list);
2979         }
2980         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
2981                                  ltd_namespace_phase_list) {
2982                 list_del_init(&ltd->ltd_namespace_phase_list);
2983         }
2984         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_list,
2985                                  ltd_namespace_list) {
2986                 list_del_init(&ltd->ltd_namespace_list);
2987         }
2988         spin_unlock(&ltds->ltd_lock);
2989
2990         CFS_FREE_BITMAP(lad->lad_bitmap);
2991
2992         OBD_FREE_PTR(lad);
2993 }
2994
2995 static int lfsck_namespace_in_notify(const struct lu_env *env,
2996                                      struct lfsck_component *com,
2997                                      struct lfsck_request *lr)
2998 {
2999         struct lfsck_instance           *lfsck = com->lc_lfsck;
3000         struct lfsck_namespace          *ns    = com->lc_file_ram;
3001         struct lfsck_assistant_data     *lad   = com->lc_data;
3002         struct lfsck_tgt_descs          *ltds  = &lfsck->li_mdt_descs;
3003         struct lfsck_tgt_desc           *ltd;
3004         bool                             fail  = false;
3005         ENTRY;
3006
3007         if (lr->lr_event != LE_PHASE1_DONE &&
3008             lr->lr_event != LE_PHASE2_DONE &&
3009             lr->lr_event != LE_PEER_EXIT)
3010                 RETURN(-EINVAL);
3011
3012         CDEBUG(D_LFSCK, "%s: namespace LFSCK handles notify %u from MDT %x, "
3013                "status %d\n", lfsck_lfsck2name(lfsck), lr->lr_event,
3014                lr->lr_index, lr->lr_status);
3015
3016         spin_lock(&ltds->ltd_lock);
3017         ltd = LTD_TGT(ltds, lr->lr_index);
3018         if (ltd == NULL) {
3019                 spin_unlock(&ltds->ltd_lock);
3020
3021                 RETURN(-ENXIO);
3022         }
3023
3024         list_del_init(&ltd->ltd_namespace_phase_list);
3025         switch (lr->lr_event) {
3026         case LE_PHASE1_DONE:
3027                 if (lr->lr_status <= 0) {
3028                         ltd->ltd_namespace_done = 1;
3029                         list_del_init(&ltd->ltd_namespace_list);
3030                         CDEBUG(D_LFSCK, "%s: MDT %x failed/stopped at "
3031                                "phase1 for namespace LFSCK: rc = %d.\n",
3032                                lfsck_lfsck2name(lfsck),
3033                                ltd->ltd_index, lr->lr_status);
3034                         ns->ln_flags |= LF_INCOMPLETE;
3035                         fail = true;
3036                         break;
3037                 }
3038
3039                 if (list_empty(&ltd->ltd_namespace_list))
3040                         list_add_tail(&ltd->ltd_namespace_list,
3041                                       &lad->lad_mdt_list);
3042                 list_add_tail(&ltd->ltd_namespace_phase_list,
3043                               &lad->lad_mdt_phase2_list);
3044                 break;
3045         case LE_PHASE2_DONE:
3046                 ltd->ltd_namespace_done = 1;
3047                 list_del_init(&ltd->ltd_namespace_list);
3048                 break;
3049         case LE_PEER_EXIT:
3050                 fail = true;
3051                 ltd->ltd_namespace_done = 1;
3052                 list_del_init(&ltd->ltd_namespace_list);
3053                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT)) {
3054                         CDEBUG(D_LFSCK,
3055                                "%s: the peer MDT %x exit namespace LFSCK\n",
3056                                lfsck_lfsck2name(lfsck), ltd->ltd_index);
3057                         ns->ln_flags |= LF_INCOMPLETE;
3058                 }
3059                 break;
3060         default:
3061                 break;
3062         }
3063         spin_unlock(&ltds->ltd_lock);
3064
3065         if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
3066                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
3067
3068                 memset(stop, 0, sizeof(*stop));
3069                 stop->ls_status = lr->lr_status;
3070                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
3071                 lfsck_stop(env, lfsck->li_bottom, stop);
3072         } else if (lfsck_phase2_next_ready(lad)) {
3073                 wake_up_all(&lad->lad_thread.t_ctl_waitq);
3074         }
3075
3076         RETURN(0);
3077 }
3078
3079 static int lfsck_namespace_query(const struct lu_env *env,
3080                                  struct lfsck_component *com)
3081 {
3082         struct lfsck_namespace *ns = com->lc_file_ram;
3083
3084         return ns->ln_status;
3085 }
3086
3087 static struct lfsck_operations lfsck_namespace_ops = {
3088         .lfsck_reset            = lfsck_namespace_reset,
3089         .lfsck_fail             = lfsck_namespace_fail,
3090         .lfsck_checkpoint       = lfsck_namespace_checkpoint,
3091         .lfsck_prep             = lfsck_namespace_prep,
3092         .lfsck_exec_oit         = lfsck_namespace_exec_oit,
3093         .lfsck_exec_dir         = lfsck_namespace_exec_dir,
3094         .lfsck_post             = lfsck_namespace_post,
3095         .lfsck_dump             = lfsck_namespace_dump,
3096         .lfsck_double_scan      = lfsck_namespace_double_scan,
3097         .lfsck_data_release     = lfsck_namespace_data_release,
3098         .lfsck_quit             = lfsck_quit_generic,
3099         .lfsck_in_notify        = lfsck_namespace_in_notify,
3100         .lfsck_query            = lfsck_namespace_query,
3101 };
3102
3103 /**
3104  * Repair dangling name entry.
3105  *
3106  * For the name entry with dangling reference, we need to repare the
3107  * inconsistency according to the LFSCK sponsor's requirement:
3108  *
3109  * 1) Keep the inconsistency there and report the inconsistency case,
3110  *    then give the chance to the application to find related issues,
3111  *    and the users can make the decision about how to handle it with
3112  *    more human knownledge. (by default)
3113  *
3114  * 2) Re-create the missed MDT-object with the FID information.
3115  *
3116  * \param[in] env       pointer to the thread context
3117  * \param[in] com       pointer to the lfsck component
3118  * \param[in] child     pointer to the object corresponding to the dangling
3119  *                      name entry
3120  * \param[in] lnr       pointer to the namespace request that contains the
3121  *                      name's name, parent object, parent's LMV, and ect.
3122  *
3123  * \retval              positive number if no need to repair
3124  * \retval              zero for repaired successfully
3125  * \retval              negative error number on failure
3126  */
3127 int lfsck_namespace_repair_dangling(const struct lu_env *env,
3128                                     struct lfsck_component *com,
3129                                     struct dt_object *child,
3130                                     struct lfsck_namespace_req *lnr)
3131 {
3132         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3133         struct lu_attr                  *la     = &info->lti_la;
3134         struct dt_allocation_hint       *hint   = &info->lti_hint;
3135         struct dt_object_format         *dof    = &info->lti_dof;
3136         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
3137         struct dt_object                *parent = lnr->lnr_obj;
3138         const struct lu_name            *cname;
3139         struct linkea_data               ldata  = { 0 };
3140         struct lustre_handle             lh     = { 0 };
3141         struct lu_buf                    linkea_buf;
3142         struct lfsck_instance           *lfsck  = com->lc_lfsck;
3143         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
3144         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
3145         struct thandle                  *th     = NULL;
3146         int                              rc     = 0;
3147         __u16                            type   = lnr->lnr_type;
3148         bool                             create;
3149         ENTRY;
3150
3151         cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen);
3152         if (bk->lb_param & LPF_CREATE_MDTOBJ)
3153                 create = true;
3154         else
3155                 create = false;
3156
3157         if (!create || bk->lb_param & LPF_DRYRUN)
3158                 GOTO(log, rc = 0);
3159
3160         rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
3161         if (rc != 0)
3162                 GOTO(log, rc);
3163
3164         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
3165         if (rc != 0)
3166                 GOTO(log, rc);
3167
3168         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
3169                               MDS_INODELOCK_UPDATE, LCK_EX);
3170         if (rc != 0)
3171                 GOTO(log, rc);
3172
3173         rc = lfsck_namespace_check_exist(env, parent, child, lnr->lnr_name);
3174         if (rc != 0)
3175                 GOTO(log, rc);
3176
3177         th = dt_trans_create(env, dev);
3178         if (IS_ERR(th))
3179                 GOTO(log, rc = PTR_ERR(th));
3180
3181         /* Set the ctime as zero, then others can know it is created for
3182          * repairing dangling name entry by LFSCK. And if the LFSCK made
3183          * wrong decision and the real MDT-object has been found later,
3184          * then the LFSCK has chance to fix the incosistency properly. */
3185         memset(la, 0, sizeof(*la));
3186         la->la_mode = (type & S_IFMT) | 0600;
3187         la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
3188                         LA_ATIME | LA_MTIME | LA_CTIME;
3189
3190         child->do_ops->do_ah_init(env, hint, parent, child,
3191                                   la->la_mode & S_IFMT);
3192
3193         memset(dof, 0, sizeof(*dof));
3194         dof->dof_type = dt_mode_to_dft(type);
3195         /* If the target is a regular file, then the LFSCK will only create
3196          * the MDT-object without stripes (dof->dof_reg.striped = 0). related
3197          * OST-objects will be created when write open. */
3198
3199         /* 1a. create child. */
3200         rc = dt_declare_create(env, child, la, hint, dof, th);
3201         if (rc != 0)
3202                 GOTO(stop, rc);
3203
3204         if (S_ISDIR(type)) {
3205                 if (unlikely(!dt_try_as_dir(env, child)))
3206                         GOTO(stop, rc = -ENOTDIR);
3207
3208                 /* 2a. insert dot into child dir */
3209                 rec->rec_type = S_IFDIR;
3210                 rec->rec_fid = lfsck_dto2fid(child);
3211                 rc = dt_declare_insert(env, child,
3212                                        (const struct dt_rec *)rec,
3213                                        (const struct dt_key *)dot, th);
3214                 if (rc != 0)
3215                         GOTO(stop, rc);
3216
3217                 /* 3a. insert dotdot into child dir */
3218                 rec->rec_fid = lfsck_dto2fid(parent);
3219                 rc = dt_declare_insert(env, child,
3220                                        (const struct dt_rec *)rec,
3221                                        (const struct dt_key *)dotdot, th);
3222                 if (rc != 0)
3223                         GOTO(stop, rc);
3224
3225                 /* 4a. increase child nlink */
3226                 rc = dt_declare_ref_add(env, child, th);
3227                 if (rc != 0)
3228                         GOTO(stop, rc);
3229         }
3230
3231         /* 5a. insert linkEA for child */
3232         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
3233                        ldata.ld_leh->leh_len);
3234         rc = dt_declare_xattr_set(env, child, &linkea_buf,
3235                                   XATTR_NAME_LINK, 0, th);
3236         if (rc != 0)
3237                 GOTO(stop, rc);
3238
3239         rc = dt_trans_start(env, dev, th);
3240         if (rc != 0)
3241                 GOTO(stop, rc = (rc == -EEXIST ? 1 : rc));
3242
3243         dt_write_lock(env, child, 0);
3244         /* 1b. create child */
3245         rc = dt_create(env, child, la, hint, dof, th);
3246         if (rc != 0)
3247                 GOTO(unlock, rc = (rc == -EEXIST ? 1 : rc));
3248
3249         if (S_ISDIR(type)) {
3250                 if (unlikely(!dt_try_as_dir(env, child)))
3251                         GOTO(unlock, rc = -ENOTDIR);
3252
3253                 /* 2b. insert dot into child dir */
3254                 rec->rec_type = S_IFDIR;
3255                 rec->rec_fid = lfsck_dto2fid(child);
3256                 rc = dt_insert(env, child, (const struct dt_rec *)rec,
3257                                (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
3258                 if (rc != 0)
3259                         GOTO(unlock, rc);
3260
3261                 /* 3b. insert dotdot into child dir */
3262                 rec->rec_fid = lfsck_dto2fid(parent);
3263                 rc = dt_insert(env, child, (const struct dt_rec *)rec,
3264                                (const struct dt_key *)dotdot, th,
3265                                BYPASS_CAPA, 1);
3266                 if (rc != 0)
3267                         GOTO(unlock, rc);
3268
3269                 /* 4b. increase child nlink */
3270                 rc = dt_ref_add(env, child, th);
3271                 if (rc != 0)
3272                         GOTO(unlock, rc);
3273         }
3274
3275         /* 5b. insert linkEA for child. */
3276         rc = dt_xattr_set(env, child, &linkea_buf,
3277                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
3278
3279         GOTO(unlock, rc);
3280
3281 unlock:
3282         dt_write_unlock(env, child);
3283
3284 stop:
3285         dt_trans_stop(env, dev, th);
3286
3287 log:
3288         lfsck_ibits_unlock(&lh, LCK_EX);
3289         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found dangling "
3290                "reference for: parent "DFID", child "DFID", type %u, "
3291                "name %s. %s: rc = %d\n", lfsck_lfsck2name(lfsck),
3292                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
3293                type, cname->ln_name,
3294                create ? "Create the lost OST-object as required" :
3295                         "Keep the MDT-object there by default", rc);
3296
3297         if (rc <= 0) {
3298                 struct lfsck_namespace *ns = com->lc_file_ram;
3299
3300                 ns->ln_flags |= LF_INCONSISTENT;
3301         }
3302
3303         return rc;
3304 }
3305
3306 static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
3307                                                 struct lfsck_component *com,
3308                                                 struct lfsck_assistant_req *lar)
3309 {
3310         struct lfsck_thread_info   *info     = lfsck_env_info(env);
3311         struct lu_attr             *la       = &info->lti_la;
3312         struct lfsck_instance      *lfsck    = com->lc_lfsck;
3313         struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
3314         struct lfsck_namespace     *ns       = com->lc_file_ram;
3315         struct linkea_data          ldata    = { 0 };
3316         const struct lu_name       *cname;
3317         struct thandle             *handle   = NULL;
3318         struct lfsck_namespace_req *lnr      =
3319                         container_of0(lar, struct lfsck_namespace_req, lnr_lar);
3320         struct dt_object           *dir      = lnr->lnr_obj;
3321         struct dt_object           *obj      = NULL;
3322         const struct lu_fid        *pfid     = lfsck_dto2fid(dir);
3323         struct dt_device           *dev;
3324         struct lustre_handle        lh       = { 0 };
3325         bool                        repaired = false;
3326         bool                        dtlocked = false;
3327         bool                        remove;
3328         bool                        newdata;
3329         bool                        log      = false;
3330         int                         idx;
3331         int                         count    = 0;
3332         int                         rc;
3333         enum lfsck_namespace_inconsistency_type type = LNIT_NONE;
3334         ENTRY;
3335
3336         if (lnr->lnr_attr & LUDA_UPGRADE) {
3337                 ns->ln_flags |= LF_UPGRADE;
3338                 ns->ln_dirent_repaired++;
3339                 repaired = true;
3340         } else if (lnr->lnr_attr & LUDA_REPAIR) {
3341                 ns->ln_flags |= LF_INCONSISTENT;
3342                 ns->ln_dirent_repaired++;
3343                 repaired = true;
3344         }
3345
3346         if (unlikely(fid_is_zero(&lnr->lnr_fid))) {
3347                 if (strcmp(lnr->lnr_name, dotdot) != 0)
3348                         LBUG();
3349                 else
3350                         rc = lfsck_namespace_trace_update(env, com, pfid,
3351                                                 LNTF_CHECK_PARENT, true);
3352
3353                 GOTO(out, rc);
3354         }
3355
3356         if (lnr->lnr_name[0] == '.' &&
3357             (lnr->lnr_namelen == 1 || fid_seq_is_dot(fid_seq(&lnr->lnr_fid))))
3358                 GOTO(out, rc = 0);
3359
3360         idx = lfsck_find_mdt_idx_by_fid(env, lfsck, &lnr->lnr_fid);
3361         if (idx < 0)
3362                 GOTO(out, rc = idx);
3363
3364         if (idx == lfsck_dev_idx(lfsck->li_bottom)) {
3365                 if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0))
3366                         GOTO(out, rc = 0);
3367
3368                 dev = lfsck->li_next;
3369         } else {
3370                 struct lfsck_tgt_desc *ltd;
3371
3372                 /* Usually, some local filesystem consistency verification
3373                  * tools can guarantee the local namespace tree consistenct.
3374                  * So the LFSCK will only verify the remote directory. */
3375                 if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0)) {
3376                         rc = lfsck_namespace_trace_update(env, com, pfid,
3377                                                 LNTF_CHECK_PARENT, true);
3378
3379                         GOTO(out, rc);
3380                 }
3381
3382                 ltd = LTD_TGT(&lfsck->li_mdt_descs, idx);
3383                 if (unlikely(ltd == NULL)) {
3384                         CDEBUG(D_LFSCK, "%s: cannot talk with MDT %x which "
3385                                "did not join the namespace LFSCK\n",
3386                                lfsck_lfsck2name(lfsck), idx);
3387                         ns->ln_flags |= LF_INCOMPLETE;
3388
3389                         GOTO(out, rc = -ENODEV);
3390                 }
3391
3392                 dev = ltd->ltd_tgt;
3393         }
3394
3395         obj = lfsck_object_find_by_dev(env, dev, &lnr->lnr_fid);
3396         if (IS_ERR(obj))
3397                 GOTO(out, rc = PTR_ERR(obj));
3398
3399         cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen);
3400         if (dt_object_exists(obj) == 0) {
3401
3402 dangling:
3403                 rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
3404                 if (rc == 0) {
3405                         type = LNIT_DANGLING;
3406                         rc = lfsck_namespace_repair_dangling(env, com,
3407                                                              obj, lnr);
3408                         if (rc == 0)
3409                                 repaired = true;
3410                 }
3411
3412                 GOTO(out, rc);
3413         }
3414
3415         if (!(bk->lb_param & LPF_DRYRUN) && repaired) {
3416
3417 again:
3418                 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
3419                                       MDS_INODELOCK_UPDATE |
3420                                       MDS_INODELOCK_XATTR, LCK_EX);
3421                 if (rc != 0)
3422                         GOTO(out, rc);
3423
3424                 handle = dt_trans_create(env, dev);
3425                 if (IS_ERR(handle))
3426                         GOTO(out, rc = PTR_ERR(handle));
3427
3428                 rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
3429                 if (rc != 0)
3430                         GOTO(stop, rc);
3431
3432                 rc = dt_trans_start(env, dev, handle);
3433                 if (rc != 0)
3434                         GOTO(stop, rc);
3435
3436                 dt_write_lock(env, obj, 0);
3437                 dtlocked = true;
3438         }
3439
3440         rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
3441         if (rc != 0)
3442                 GOTO(stop, rc);
3443
3444         rc = lfsck_links_read(env, obj, &ldata);
3445         if (unlikely(rc == -ENOENT)) {
3446                 if (handle != NULL) {
3447                         dt_write_unlock(env, obj);
3448                         dtlocked = false;
3449
3450                         dt_trans_stop(env, dev, handle);
3451                         handle = NULL;
3452
3453                         lfsck_ibits_unlock(&lh, LCK_EX);
3454                 }
3455
3456                 /* It may happen when the remote object has been removed,
3457                  * but the local MDT does not aware of that. */
3458                 goto dangling;
3459         } else if (rc == 0) {
3460                 count = ldata.ld_leh->leh_reccount;
3461                 rc = linkea_links_find(&ldata, cname, pfid);
3462                 if ((rc == 0) &&
3463                     (count == 1 || !S_ISDIR(lfsck_object_type(obj))))
3464                         goto record;
3465
3466                 ns->ln_flags |= LF_INCONSISTENT;
3467                 /* For sub-dir object, we cannot make sure whether the sub-dir
3468                  * back references the parent via ".." name entry correctly or
3469                  * not in the LFSCK first-stage scanning. It may be that the
3470                  * (remote) sub-dir ".." name entry has no parent FID after
3471                  * file-level backup/restore and its linkEA may be wrong.
3472                  * So under such case, we should replace the linkEA according
3473                  * to current name entry. But this needs to be done during the
3474                  * LFSCK second-stage scanning. The LFSCK will record the name
3475                  * entry for further possible using. */
3476                 remove = false;
3477                 newdata = false;
3478                 goto nodata;
3479         } else if (unlikely(rc == -EINVAL)) {
3480                 count = 1;
3481                 ns->ln_flags |= LF_INCONSISTENT;
3482                 /* The magic crashed, we are not sure whether there are more
3483                  * corrupt data in the linkea, so remove all linkea entries. */
3484                 remove = true;
3485                 newdata = true;
3486                 goto nodata;
3487         } else if (rc == -ENODATA) {
3488                 count = 1;
3489                 ns->ln_flags |= LF_UPGRADE;
3490                 remove = false;
3491                 newdata = true;
3492
3493 nodata:
3494                 if (bk->lb_param & LPF_DRYRUN) {
3495                         ns->ln_linkea_repaired++;
3496                         repaired = true;
3497                         log = true;
3498                         goto record;
3499                 }
3500
3501                 if (!lustre_handle_is_used(&lh))
3502                         goto again;
3503
3504                 if (remove) {
3505                         LASSERT(newdata);
3506
3507                         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
3508                                           BYPASS_CAPA);
3509                         if (rc != 0)
3510                                 GOTO(stop, rc);
3511                 }
3512
3513                 if (newdata) {
3514                         rc = linkea_data_new(&ldata,
3515                                         &lfsck_env_info(env)->lti_linkea_buf);
3516                         if (rc != 0)
3517                                 GOTO(stop, rc);
3518                 }
3519
3520                 rc = linkea_add_buf(&ldata, cname, pfid);
3521                 if (rc != 0)
3522                         GOTO(stop, rc);
3523
3524                 rc = lfsck_links_write(env, obj, &ldata, handle);
3525                 if (rc != 0)
3526                         GOTO(stop, rc);
3527
3528                 count = ldata.ld_leh->leh_reccount;
3529                 if (!S_ISDIR(lfsck_object_type(obj)) ||
3530                     !dt_object_remote(obj)) {
3531                         ns->ln_linkea_repaired++;
3532                         repaired = true;
3533                         log = true;
3534                 }
3535         } else {
3536                 GOTO(stop, rc);
3537         }
3538
3539 record:
3540         LASSERT(count > 0);
3541
3542         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
3543         if (rc != 0)
3544                 GOTO(stop, rc);
3545
3546         if ((count == 1 && la->la_nlink == 1) ||
3547             S_ISDIR(lfsck_object_type(obj)))
3548                 /* Usually, it is for single linked object or dir, do nothing.*/
3549                 GOTO(stop, rc);
3550
3551         /* Following modification will be in another transaction.  */
3552         if (handle != NULL) {
3553                 dt_write_unlock(env, obj);
3554                 dtlocked = false;
3555
3556                 dt_trans_stop(env, dev, handle);
3557                 handle = NULL;
3558
3559                 lfsck_ibits_unlock(&lh, LCK_EX);
3560         }
3561
3562         ns->ln_mul_linked_checked++;
3563         rc = lfsck_namespace_trace_update(env, com, &lnr->lnr_fid,
3564                                           LNTF_CHECK_LINKEA, true);
3565
3566         GOTO(out, rc);
3567
3568 stop:
3569         if (dtlocked)
3570                 dt_write_unlock(env, obj);
3571
3572         if (handle != NULL && !IS_ERR(handle))
3573                 dt_trans_stop(env, dev, handle);
3574
3575 out:
3576         lfsck_ibits_unlock(&lh, LCK_EX);
3577         down_write(&com->lc_sem);
3578         if (rc < 0) {
3579                 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle "
3580                        "the entry: "DFID", parent "DFID", name %.*s: rc = %d\n",
3581                        lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid),
3582                        PFID(lfsck_dto2fid(lnr->lnr_obj)),
3583                        lnr->lnr_namelen, lnr->lnr_name, rc);
3584
3585                 lfsck_namespace_record_failure(env, lfsck, ns);
3586                 if (!(bk->lb_param & LPF_FAILOUT))
3587                         rc = 0;
3588         } else {
3589                 if (log)
3590                         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant "
3591                                "repaired the entry: "DFID", parent "DFID
3592                                ", name %.*s\n", lfsck_lfsck2name(lfsck),
3593                                PFID(&lnr->lnr_fid),
3594                                PFID(lfsck_dto2fid(lnr->lnr_obj)),
3595                                lnr->lnr_namelen, lnr->lnr_name);
3596
3597                 if (repaired) {
3598                         ns->ln_items_repaired++;
3599
3600                         switch (type) {
3601                         case LNIT_DANGLING:
3602                                 ns->ln_dangling_repaired++;
3603                                 break;
3604                         default:
3605                                 break;
3606                         }
3607
3608                         if (bk->lb_param & LPF_DRYRUN &&
3609                             lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
3610                                 lfsck_pos_fill(env, lfsck,
3611                                                &ns->ln_pos_first_inconsistent,
3612                                                false);
3613                 }
3614                 rc = 0;
3615         }
3616         up_write(&com->lc_sem);
3617
3618         if (obj != NULL && !IS_ERR(obj))
3619                 lfsck_object_put(env, obj);
3620         return rc;
3621 }
3622
3623 static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env,
3624                                                 struct lfsck_component *com)
3625 {
3626         struct lfsck_instance   *lfsck  = com->lc_lfsck;
3627         struct ptlrpc_thread    *thread = &lfsck->li_thread;
3628         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
3629         struct lfsck_namespace  *ns     = com->lc_file_ram;
3630         struct dt_object        *obj    = com->lc_obj;
3631         const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
3632         struct dt_object        *target;
3633         struct dt_it            *di;
3634         struct dt_key           *key;
3635         struct lu_fid            fid;
3636         int                      rc;
3637         __u8                     flags  = 0;
3638         ENTRY;
3639
3640         CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
3641                lfsck_lfsck2name(lfsck));
3642
3643         com->lc_new_checked = 0;
3644         com->lc_new_scanned = 0;
3645         com->lc_time_last_checkpoint = cfs_time_current();
3646         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
3647                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
3648
3649         di = iops->init(env, obj, 0, BYPASS_CAPA);
3650         if (IS_ERR(di))
3651                 RETURN(PTR_ERR(di));
3652
3653         fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
3654         rc = iops->get(env, di, (const struct dt_key *)&fid);
3655         if (rc < 0)
3656                 GOTO(fini, rc);
3657
3658         /* Skip the start one, which either has been processed or non-exist. */
3659         rc = iops->next(env, di);
3660         if (rc != 0)
3661                 GOTO(put, rc);
3662
3663         do {
3664                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
3665                     cfs_fail_val > 0) {
3666                         struct l_wait_info lwi;
3667
3668                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
3669                                           NULL, NULL);
3670                         l_wait_event(thread->t_ctl_waitq,
3671                                      !thread_is_running(thread),
3672                                      &lwi);
3673
3674                         if (unlikely(!thread_is_running(thread)))
3675                                 GOTO(put, rc = 0);
3676                 }
3677
3678                 key = iops->key(env, di);
3679                 fid_be_to_cpu(&fid, (const struct lu_fid *)key);
3680                 if (!fid_is_sane(&fid)) {
3681                         rc = 0;
3682                         goto checkpoint;
3683                 }
3684
3685                 target = lfsck_object_find(env, lfsck, &fid);
3686                 if (IS_ERR(target)) {
3687                         rc = PTR_ERR(target);
3688                         goto checkpoint;
3689                 }
3690
3691                 if (dt_object_exists(target)) {
3692                         rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
3693                         if (rc == 0) {
3694                                 rc = lfsck_namespace_double_scan_one(env, com,
3695                                                                 target, flags);
3696                                 if (rc == -ENOENT)
3697                                         rc = 0;
3698                         }
3699                 }
3700
3701                 lfsck_object_put(env, target);
3702
3703 checkpoint:
3704                 down_write(&com->lc_sem);
3705                 com->lc_new_checked++;
3706                 com->lc_new_scanned++;
3707                 ns->ln_fid_latest_scanned_phase2 = fid;
3708                 if (rc > 0)
3709                         ns->ln_objs_repaired_phase2++;
3710                 else if (rc < 0)
3711                         ns->ln_objs_failed_phase2++;
3712                 up_write(&com->lc_sem);
3713
3714                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
3715                         GOTO(put, rc);
3716
3717                 if (unlikely(cfs_time_beforeq(com->lc_time_next_checkpoint,
3718                                               cfs_time_current())) &&
3719                     com->lc_new_checked != 0) {
3720                         down_write(&com->lc_sem);
3721                         ns->ln_run_time_phase2 +=
3722                                 cfs_duration_sec(cfs_time_current() +
3723                                 HALF_SEC - com->lc_time_last_checkpoint);
3724                         ns->ln_time_last_checkpoint = cfs_time_current_sec();
3725                         ns->ln_objs_checked_phase2 += com->lc_new_checked;
3726                         com->lc_new_checked = 0;
3727                         rc = lfsck_namespace_store(env, com, false);
3728                         up_write(&com->lc_sem);
3729                         if (rc != 0)
3730                                 GOTO(put, rc);
3731
3732                         com->lc_time_last_checkpoint = cfs_time_current();
3733                         com->lc_time_next_checkpoint =
3734                                 com->lc_time_last_checkpoint +
3735                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
3736                 }
3737
3738                 lfsck_control_speed_by_self(com);
3739                 if (unlikely(!thread_is_running(thread)))
3740                         GOTO(put, rc = 0);
3741
3742                 rc = iops->next(env, di);
3743         } while (rc == 0);
3744
3745         GOTO(put, rc);
3746
3747 put:
3748         iops->put(env, di);
3749
3750 fini:
3751         iops->fini(env, di);
3752
3753         CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan stop: rc = %d\n",
3754                lfsck_lfsck2name(lfsck), rc);
3755
3756         return rc;
3757 }
3758
3759 static void lfsck_namespace_assistant_fill_pos(const struct lu_env *env,
3760                                                struct lfsck_component *com,
3761                                                struct lfsck_position *pos)
3762 {
3763         struct lfsck_assistant_data     *lad = com->lc_data;
3764         struct lfsck_namespace_req      *lnr;
3765
3766         if (list_empty(&lad->lad_req_list))
3767                 return;
3768
3769         lnr = list_entry(lad->lad_req_list.next,
3770                          struct lfsck_namespace_req,
3771                          lnr_lar.lar_list);
3772         pos->lp_oit_cookie = lnr->lnr_oit_cookie;
3773         pos->lp_dir_cookie = lnr->lnr_dir_cookie - 1;
3774         pos->lp_dir_parent = *lfsck_dto2fid(lnr->lnr_obj);
3775 }
3776
3777 static int lfsck_namespace_double_scan_result(const struct lu_env *env,
3778                                               struct lfsck_component *com,
3779                                               int rc)
3780 {
3781         struct lfsck_instance   *lfsck  = com->lc_lfsck;
3782         struct lfsck_namespace  *ns     = com->lc_file_ram;
3783
3784         down_write(&com->lc_sem);
3785         ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
3786                                 HALF_SEC - lfsck->li_time_last_checkpoint);
3787         ns->ln_time_last_checkpoint = cfs_time_current_sec();
3788         ns->ln_objs_checked_phase2 += com->lc_new_checked;
3789         com->lc_new_checked = 0;
3790
3791         if (rc > 0) {
3792                 if (ns->ln_flags & LF_INCOMPLETE)
3793                         ns->ln_status = LS_PARTIAL;
3794                 else
3795                         ns->ln_status = LS_COMPLETED;
3796                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN))
3797                         ns->ln_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
3798                 ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
3799                 ns->ln_success_count++;
3800         } else if (rc == 0) {
3801                 ns->ln_status = lfsck->li_status;
3802                 if (ns->ln_status == 0)
3803                         ns->ln_status = LS_STOPPED;
3804         } else {
3805                 ns->ln_status = LS_FAILED;
3806         }
3807
3808         rc = lfsck_namespace_store(env, com, false);
3809         up_write(&com->lc_sem);
3810
3811         return rc;
3812 }
3813
3814 static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env,
3815                                                     struct lfsck_component *com,
3816                                                     struct lfsck_request *lr)
3817 {
3818         /* XXX: TBD */
3819 }
3820
3821 struct lfsck_assistant_operations lfsck_namespace_assistant_ops = {
3822         .la_handler_p1          = lfsck_namespace_assistant_handler_p1,
3823         .la_handler_p2          = lfsck_namespace_assistant_handler_p2,
3824         .la_fill_pos            = lfsck_namespace_assistant_fill_pos,
3825         .la_double_scan_result  = lfsck_namespace_double_scan_result,
3826         .la_req_fini            = lfsck_namespace_assistant_req_fini,
3827         .la_sync_failures       = lfsck_namespace_assistant_sync_failures,
3828 };
3829
3830 /**
3831  * Verify the specified linkEA entry for the given directory object.
3832  * If the object has no such linkEA entry or it has more other linkEA
3833  * entries, then re-generate the linkEA with the given information.
3834  *
3835  * \param[in] env       pointer to the thread context
3836  * \param[in] dev       pointer to the dt_device
3837  * \param[in] obj       pointer to the dt_object to be handled
3838  * \param[in] cname     the name for the child in the parent directory
3839  * \param[in] pfid      the parent directory's FID for the linkEA
3840  *
3841  * \retval              0 for success
3842  * \retval              negative error number on failure
3843  */
3844 int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev,
3845                         struct dt_object *obj, const struct lu_name *cname,
3846                         const struct lu_fid *pfid)
3847 {
3848         struct linkea_data       ldata  = { 0 };
3849         struct lu_buf            linkea_buf;
3850         struct thandle          *th;
3851         int                      rc;
3852         int                      fl     = LU_XATTR_CREATE;
3853         bool                     dirty  = false;
3854         ENTRY;
3855
3856         LASSERT(S_ISDIR(lfsck_object_type(obj)));
3857
3858         rc = lfsck_links_read(env, obj, &ldata);
3859         if (rc == -ENODATA) {
3860                 dirty = true;
3861         } else if (rc == 0) {
3862                 fl = LU_XATTR_REPLACE;
3863                 if (ldata.ld_leh->leh_reccount != 1) {
3864                         dirty = true;
3865                 } else {
3866                         rc = linkea_links_find(&ldata, cname, pfid);
3867                         if (rc != 0)
3868                                 dirty = true;
3869                 }
3870         }
3871
3872         if (!dirty)
3873                 RETURN(rc);
3874
3875         rc = linkea_data_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf);
3876         if (rc != 0)
3877                 RETURN(rc);
3878
3879         rc = linkea_add_buf(&ldata, cname, pfid);
3880         if (rc != 0)
3881                 RETURN(rc);
3882
3883         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
3884                        ldata.ld_leh->leh_len);
3885         th = dt_trans_create(env, dev);
3886         if (IS_ERR(th))
3887                 RETURN(PTR_ERR(th));
3888
3889         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
3890                                   XATTR_NAME_LINK, fl, th);
3891         if (rc != 0)
3892                 GOTO(stop, rc);
3893
3894         rc = dt_trans_start_local(env, dev, th);
3895         if (rc != 0)
3896                 GOTO(stop, rc);
3897
3898         dt_write_lock(env, obj, 0);
3899         rc = dt_xattr_set(env, obj, &linkea_buf,
3900                           XATTR_NAME_LINK, fl, th, BYPASS_CAPA);
3901         dt_write_unlock(env, obj);
3902
3903         GOTO(stop, rc);
3904
3905 stop:
3906         dt_trans_stop(env, dev, th);
3907         return rc;
3908 }
3909
3910 /**
3911  * Get the name and parent directory's FID from the first linkEA entry.
3912  *
3913  * \param[in] env       pointer to the thread context
3914  * \param[in] obj       pointer to the object which get linkEA from
3915  * \param[out] name     pointer to the buffer to hold the name
3916  *                      in the first linkEA entry
3917  * \param[out] pfid     pointer to the buffer to hold the parent
3918  *                      directory's FID in the first linkEA entry
3919  *
3920  * \retval              0 for success
3921  * \retval              negative error number on failure
3922  */
3923 int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj,
3924                           char *name, struct lu_fid *pfid)
3925 {
3926         struct lu_name           *cname = &lfsck_env_info(env)->lti_name;
3927         struct linkea_data        ldata = { 0 };
3928         int                       rc;
3929
3930         rc = lfsck_links_read(env, obj, &ldata);
3931         if (rc != 0)
3932                 return rc;
3933
3934         linkea_first_entry(&ldata);
3935         if (ldata.ld_lee == NULL)
3936                 return -ENODATA;
3937
3938         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
3939         /* To guarantee the 'name' is terminated with '0'. */
3940         memcpy(name, cname->ln_name, cname->ln_namelen);
3941         name[cname->ln_namelen] = 0;
3942
3943         return 0;
3944 }
3945
3946 /**
3947  * Remove the name entry from the parent directory.
3948  *
3949  * No need to care about the object referenced by the name entry,
3950  * either the name entry is invalid or redundant, or the referenced
3951  * object has been processed has been or will be handled by others.
3952  *
3953  * \param[in] env       pointer to the thread context
3954  * \param[in] lfsck     pointer to the lfsck instance
3955  * \param[in] parent    pointer to the lost+found object
3956  * \param[in] name      the name for the name entry to be removed
3957  * \param[in] type      the type for the name entry to be removed
3958  *
3959  * \retval              0 for success
3960  * \retval              negative error number on failure
3961  */
3962 int lfsck_remove_name_entry(const struct lu_env *env,
3963                             struct lfsck_instance *lfsck,
3964                             struct dt_object *parent,
3965                             const char *name, __u32 type)
3966 {
3967         struct dt_device        *dev    = lfsck->li_next;
3968         struct thandle          *th;
3969         struct lustre_handle     lh     = { 0 };
3970         int                      rc;
3971         ENTRY;
3972
3973         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
3974                               MDS_INODELOCK_UPDATE, LCK_EX);
3975         if (rc != 0)
3976                 RETURN(rc);
3977
3978         th = dt_trans_create(env, dev);
3979         if (IS_ERR(th))
3980                 GOTO(unlock, rc = PTR_ERR(th));
3981
3982         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
3983         if (rc != 0)
3984                 GOTO(stop, rc);
3985
3986         if (S_ISDIR(type)) {
3987                 rc = dt_declare_ref_del(env, parent, th);
3988                 if (rc != 0)
3989                         GOTO(stop, rc);
3990         }
3991
3992         rc = dt_trans_start(env, dev, th);
3993         if (rc != 0)
3994                 GOTO(stop, rc);
3995
3996         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
3997                        BYPASS_CAPA);
3998         if (rc != 0)
3999                 GOTO(stop, rc);
4000
4001         if (S_ISDIR(type)) {
4002                 dt_write_lock(env, parent, 0);
4003                 rc = dt_ref_del(env, parent, th);
4004                 dt_write_unlock(env, parent);
4005         }
4006
4007         GOTO(stop, rc);
4008
4009 stop:
4010         dt_trans_stop(env, dev, th);
4011
4012 unlock:
4013         lfsck_ibits_unlock(&lh, LCK_EX);
4014
4015         CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s "
4016                "with type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
4017                PFID(lfsck_dto2fid(parent)), name, type, rc);
4018
4019         return rc;
4020 }
4021
4022 /**
4023  * Update the object's name entry with the given FID.
4024  *
4025  * \param[in] env       pointer to the thread context
4026  * \param[in] lfsck     pointer to the lfsck instance
4027  * \param[in] parent    pointer to the parent directory that holds
4028  *                      the name entry
4029  * \param[in] name      the name for the entry to be updated
4030  * \param[in] pfid      the new PFID for the name entry
4031  * \param[in] type      the type for the name entry to be updated
4032  *
4033  * \retval              0 for success
4034  * \retval              negative error number on failure
4035  */
4036 int lfsck_update_name_entry(const struct lu_env *env,
4037                             struct lfsck_instance *lfsck,
4038                             struct dt_object *parent, const char *name,
4039                             const struct lu_fid *pfid, __u32 type)
4040 {
4041         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
4042         struct dt_device        *dev    = lfsck->li_next;
4043         struct lustre_handle     lh     = { 0 };
4044         struct thandle          *th;
4045         int                      rc;
4046         bool                     exists = true;
4047         ENTRY;
4048
4049         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
4050                               MDS_INODELOCK_UPDATE, LCK_EX);
4051         if (rc != 0)
4052                 RETURN(rc);
4053
4054         th = dt_trans_create(env, dev);
4055         if (IS_ERR(th))
4056                 GOTO(unlock, rc = PTR_ERR(th));
4057
4058         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
4059         if (rc != 0)
4060                 GOTO(stop, rc);
4061
4062         rec->rec_type = type;
4063         rec->rec_fid = pfid;
4064         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
4065                                (const struct dt_key *)name, th);
4066         if (rc != 0)
4067                 GOTO(stop, rc);
4068
4069         rc = dt_declare_ref_add(env, parent, th);
4070         if (rc != 0)
4071                 GOTO(stop, rc);
4072
4073         rc = dt_trans_start(env, dev, th);
4074         if (rc != 0)
4075                 GOTO(stop, rc);
4076
4077         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
4078                        BYPASS_CAPA);
4079         if (rc == -ENOENT) {
4080                 exists = false;
4081                 rc = 0;
4082         }
4083
4084         if (rc != 0)
4085                 GOTO(stop, rc);
4086
4087         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
4088                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
4089         if (rc == 0 && S_ISDIR(type) && !exists) {
4090                 dt_write_lock(env, parent, 0);
4091                 rc = dt_ref_add(env, parent, th);
4092                 dt_write_unlock(env, parent);
4093         }
4094
4095         GOTO(stop, rc);
4096
4097 stop:
4098         dt_trans_stop(env, dev, th);
4099
4100 unlock:
4101         lfsck_ibits_unlock(&lh, LCK_EX);
4102
4103         CDEBUG(D_LFSCK, "%s: update name entry "DFID"/%s with the FID "DFID
4104                " and the type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
4105                PFID(lfsck_dto2fid(parent)), name, PFID(pfid), type, rc);
4106
4107         return rc;
4108 }
4109
4110 int lfsck_namespace_setup(const struct lu_env *env,
4111                           struct lfsck_instance *lfsck)
4112 {
4113         struct lfsck_component  *com;
4114         struct lfsck_namespace  *ns;
4115         struct dt_object        *root = NULL;
4116         struct dt_object        *obj;
4117         int                      rc;
4118         ENTRY;
4119
4120         LASSERT(lfsck->li_master);
4121
4122         OBD_ALLOC_PTR(com);
4123         if (com == NULL)
4124                 RETURN(-ENOMEM);
4125
4126         INIT_LIST_HEAD(&com->lc_link);
4127         INIT_LIST_HEAD(&com->lc_link_dir);
4128         init_rwsem(&com->lc_sem);
4129         atomic_set(&com->lc_ref, 1);
4130         com->lc_lfsck = lfsck;
4131         com->lc_type = LFSCK_TYPE_NAMESPACE;
4132         com->lc_ops = &lfsck_namespace_ops;
4133         com->lc_data = lfsck_assistant_data_init(
4134                         &lfsck_namespace_assistant_ops,
4135                         "lfsck_namespace");
4136         if (com->lc_data == NULL)
4137                 GOTO(out, rc = -ENOMEM);
4138
4139         com->lc_file_size = sizeof(struct lfsck_namespace);
4140         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
4141         if (com->lc_file_ram == NULL)
4142                 GOTO(out, rc = -ENOMEM);
4143
4144         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
4145         if (com->lc_file_disk == NULL)
4146                 GOTO(out, rc = -ENOMEM);
4147
4148         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
4149         if (IS_ERR(root))
4150                 GOTO(out, rc = PTR_ERR(root));
4151
4152         if (unlikely(!dt_try_as_dir(env, root)))
4153                 GOTO(out, rc = -ENOTDIR);
4154
4155         obj = local_index_find_or_create(env, lfsck->li_los, root,
4156                                          lfsck_namespace_name,
4157                                          S_IFREG | S_IRUGO | S_IWUSR,
4158                                          &dt_lfsck_features);
4159         if (IS_ERR(obj))
4160                 GOTO(out, rc = PTR_ERR(obj));
4161
4162         com->lc_obj = obj;
4163         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
4164         if (rc != 0)
4165                 GOTO(out, rc);
4166
4167         rc = lfsck_namespace_load(env, com);
4168         if (rc > 0)
4169                 rc = lfsck_namespace_reset(env, com, true);
4170         else if (rc == -ENODATA)
4171                 rc = lfsck_namespace_init(env, com);
4172         if (rc != 0)
4173                 GOTO(out, rc);
4174
4175         ns = com->lc_file_ram;
4176         switch (ns->ln_status) {
4177         case LS_INIT:
4178         case LS_COMPLETED:
4179         case LS_FAILED:
4180         case LS_STOPPED:
4181                 spin_lock(&lfsck->li_lock);
4182                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
4183                 spin_unlock(&lfsck->li_lock);
4184                 break;
4185         default:
4186                 CERROR("%s: unknown lfsck_namespace status %d\n",
4187                        lfsck_lfsck2name(lfsck), ns->ln_status);
4188                 /* fall through */
4189         case LS_SCANNING_PHASE1:
4190         case LS_SCANNING_PHASE2:
4191                 /* No need to store the status to disk right now.
4192                  * If the system crashed before the status stored,
4193                  * it will be loaded back when next time. */
4194                 ns->ln_status = LS_CRASHED;
4195                 /* fall through */
4196         case LS_PAUSED:
4197         case LS_CRASHED:
4198                 spin_lock(&lfsck->li_lock);
4199                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
4200                 list_add_tail(&com->lc_link_dir, &lfsck->li_list_dir);
4201                 spin_unlock(&lfsck->li_lock);
4202                 break;
4203         }
4204
4205         GOTO(out, rc = 0);
4206
4207 out:
4208         if (root != NULL && !IS_ERR(root))
4209                 lu_object_put(env, &root->do_lu);
4210         if (rc != 0) {
4211                 lfsck_component_cleanup(env, com);
4212                 CERROR("%s: fail to init namespace LFSCK component: rc = %d\n",
4213                        lfsck_lfsck2name(lfsck), rc);
4214         }
4215         return rc;
4216 }